2021-08-24 12:30:31 +00:00
# include <Coordination/KeeperDispatcher.h>
2022-11-29 12:58:58 +00:00
# include <libnuraft/async.hxx>
2022-09-21 06:57:58 +00:00
2021-11-05 10:21:34 +00:00
# include <Poco/Path.h>
2022-09-15 07:45:28 +00:00
# include <Poco/Util/AbstractConfiguration.h>
2022-09-15 09:01:43 +00:00
2023-03-06 14:53:58 +00:00
# include <base/hex.h>
2022-09-15 09:01:43 +00:00
# include <Common/setThreadName.h>
# include <Common/ZooKeeper/KeeperException.h>
2021-11-18 20:17:22 +00:00
# include <Common/checkStackSize.h>
2022-06-10 07:49:46 +00:00
# include <Common/CurrentMetrics.h>
2023-03-05 16:45:17 +00:00
# include <Common/ProfileEvents.h>
2023-04-08 04:47:21 +00:00
# include <Common/logger_useful.h>
2022-09-15 09:01:43 +00:00
2023-08-07 09:05:21 +00:00
# include <atomic>
2022-09-15 09:01:43 +00:00
# include <future>
# include <chrono>
# include <filesystem>
2022-09-07 11:51:56 +00:00
# include <iterator>
2022-06-28 08:48:19 +00:00
# include <limits>
2022-06-10 07:49:46 +00:00
2023-03-05 16:45:17 +00:00
# if USE_JEMALLOC
# include <jemalloc / jemalloc.h>
# define STRINGIFY_HELPER(x) #x
# define STRINGIFY(x) STRINGIFY_HELPER(x)
# endif
2022-06-10 07:49:46 +00:00
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections ;
extern const Metric KeeperOutstandingRequets ;
}
2021-11-12 12:48:42 +00:00
2023-03-05 16:45:17 +00:00
namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge ;
extern const Event MemoryAllocatorPurgeTimeMicroseconds ;
}
2023-04-20 13:26:02 +00:00
using namespace std : : chrono_literals ;
2021-01-19 14:22:28 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
extern const int TIMEOUT_EXCEEDED ;
2021-10-08 13:32:02 +00:00
extern const int SYSTEM_ERROR ;
2021-10-27 12:26:42 +00:00
}
2023-11-27 17:28:10 +00:00
namespace
{
bool checkIfRequestIncreaseMem ( const Coordination : : ZooKeeperRequestPtr & request )
{
if ( request - > getOpNum ( ) = = Coordination : : OpNum : : Create | | request - > getOpNum ( ) = = Coordination : : OpNum : : CreateIfNotExists )
{
2023-12-01 17:07:45 +00:00
return true ;
2023-11-27 17:28:10 +00:00
}
else if ( request - > getOpNum ( ) = = Coordination : : OpNum : : Multi )
{
Coordination : : ZooKeeperMultiRequest & multi_req = dynamic_cast < Coordination : : ZooKeeperMultiRequest & > ( * request ) ;
Int64 memory_delta = 0 ;
for ( const auto & sub_req : multi_req . requests )
{
auto sub_zk_request = std : : dynamic_pointer_cast < Coordination : : ZooKeeperRequest > ( sub_req ) ;
switch ( sub_zk_request - > getOpNum ( ) )
{
case Coordination : : OpNum : : Create :
case Coordination : : OpNum : : CreateIfNotExists :
{
Coordination : : ZooKeeperCreateRequest & create_req = dynamic_cast < Coordination : : ZooKeeperCreateRequest & > ( * sub_zk_request ) ;
memory_delta + = create_req . bytesSize ( ) ;
break ;
}
case Coordination : : OpNum : : Remove :
{
Coordination : : ZooKeeperRemoveRequest & remove_req = dynamic_cast < Coordination : : ZooKeeperRemoveRequest & > ( * sub_zk_request ) ;
memory_delta - = remove_req . bytesSize ( ) ;
break ;
}
default :
break ;
}
}
return memory_delta > 0 ;
}
2023-12-01 17:07:45 +00:00
return false ;
2023-11-27 17:28:10 +00:00
}
}
2021-01-19 14:22:28 +00:00
2021-08-24 12:30:31 +00:00
KeeperDispatcher : : KeeperDispatcher ( )
2021-10-27 12:26:42 +00:00
: responses_queue ( std : : numeric_limits < size_t > : : max ( ) )
2021-11-18 20:17:22 +00:00
, configuration_and_settings ( std : : make_shared < KeeperConfigurationAndSettings > ( ) )
2021-03-29 08:24:56 +00:00
, log ( & Poco : : Logger : : get ( " KeeperDispatcher " ) )
2022-09-21 11:53:54 +00:00
{ }
2021-04-16 13:50:09 +00:00
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : requestThread ( )
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName ( " KeeperReqT " ) ;
2021-04-16 13:50:09 +00:00
/// Result of requests batch from previous iteration
2022-09-17 17:22:58 +00:00
RaftAppendResult prev_result = nullptr ;
/// Requests from previous iteration. We store them to be able
/// to send errors to the client.
KeeperStorage : : RequestsForSessions prev_batch ;
2021-04-16 13:50:09 +00:00
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2021-01-26 14:08:31 +00:00
while ( ! shutdown_called )
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
KeeperStorage : : RequestForSession request ;
2021-01-19 14:22:28 +00:00
2021-11-18 20:17:22 +00:00
auto coordination_settings = configuration_and_settings - > coordination_settings ;
uint64_t max_wait = coordination_settings - > operation_timeout_ms . totalMilliseconds ( ) ;
2023-03-22 23:32:58 +00:00
uint64_t max_batch_bytes_size = coordination_settings - > max_requests_batch_bytes_size ;
2021-04-16 13:50:09 +00:00
2022-09-17 17:22:58 +00:00
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
2023-07-06 17:12:24 +00:00
/// Also there is a special reconfig request also being a separator.
2021-04-16 13:50:09 +00:00
try
2021-01-26 07:47:04 +00:00
{
2022-09-17 17:22:58 +00:00
if ( requests_queue - > tryPop ( request , max_wait ) )
2021-01-19 14:22:28 +00:00
{
2022-06-10 07:49:46 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : KeeperOutstandingRequets ) ;
2021-04-16 13:50:09 +00:00
if ( shutdown_called )
break ;
2023-11-27 17:28:10 +00:00
Int64 mem_soft_limit = configuration_and_settings - > coordination_settings - > max_memory_usage_soft_limit ;
2023-12-01 17:07:45 +00:00
if ( configuration_and_settings - > standalone_keeper & & mem_soft_limit > 0 & & total_memory_tracker . get ( ) > = mem_soft_limit & & checkIfRequestIncreaseMem ( request . request ) )
2023-11-27 17:28:10 +00:00
{
2023-12-01 17:07:45 +00:00
LOG_TRACE ( log , " Processing requests refused because of mem_soft_limit {}, the total used memory is {}, request type is {} " , mem_soft_limit , total_memory_tracker . get ( ) , request . request - > getOpNum ( ) ) ;
2023-11-27 17:28:10 +00:00
addErrorResponses ( { request } , Coordination : : Error : : ZCONNECTIONLOSS ) ;
continue ;
}
2022-09-17 17:22:58 +00:00
KeeperStorage : : RequestsForSessions current_batch ;
2023-03-22 23:32:58 +00:00
size_t current_batch_bytes_size = 0 ;
2022-09-17 17:22:58 +00:00
2023-03-27 07:36:39 +00:00
bool has_read_request = false ;
2023-07-06 17:12:24 +00:00
bool has_reconfig_request = false ;
2021-04-16 13:50:09 +00:00
2023-07-06 17:12:24 +00:00
/// If new request is not read request or reconfig request we must process it through quorum.
2022-09-17 17:22:58 +00:00
/// Otherwise we will process it locally.
2023-07-06 17:12:24 +00:00
if ( request . request - > getOpNum ( ) = = Coordination : : OpNum : : Reconfig )
has_reconfig_request = true ;
else if ( coordination_settings - > quorum_reads | | ! request . request - > isReadRequest ( ) )
2021-04-16 13:50:09 +00:00
{
2023-03-22 23:32:58 +00:00
current_batch_bytes_size + = request . request - > bytesSize ( ) ;
2022-09-17 17:22:58 +00:00
current_batch . emplace_back ( request ) ;
2021-04-16 13:50:09 +00:00
2022-11-29 12:58:58 +00:00
const auto try_get_request = [ & ]
2021-04-16 13:50:09 +00:00
{
2022-09-17 17:22:58 +00:00
/// Trying to get batch requests as fast as possible
2022-11-26 17:33:40 +00:00
if ( requests_queue - > tryPop ( request ) )
2022-09-17 17:22:58 +00:00
{
CurrentMetrics : : sub ( CurrentMetrics : : KeeperOutstandingRequets ) ;
/// Don't append read request into batch, we have to process them separately
if ( ! coordination_settings - > quorum_reads & & request . request - > isReadRequest ( ) )
2023-03-24 14:37:40 +00:00
{
2023-03-24 20:40:45 +00:00
const auto & last_request = current_batch . back ( ) ;
std : : lock_guard lock ( read_request_queue_mutex ) ;
read_request_queue [ last_request . session_id ] [ last_request . request - > xid ] . push_back ( request ) ;
2023-03-24 14:37:40 +00:00
}
2023-07-05 18:23:34 +00:00
else if ( request . request - > getOpNum ( ) = = Coordination : : OpNum : : Reconfig )
2023-07-06 17:12:24 +00:00
{
has_reconfig_request = true ;
return false ;
}
2022-09-17 17:22:58 +00:00
else
2023-03-22 23:32:58 +00:00
{
current_batch_bytes_size + = request . request - > bytesSize ( ) ;
2022-09-17 17:22:58 +00:00
current_batch . emplace_back ( request ) ;
2023-03-22 23:32:58 +00:00
}
2022-11-29 12:58:58 +00:00
return true ;
2022-11-25 17:25:28 +00:00
}
2022-09-17 17:22:58 +00:00
2022-11-29 12:58:58 +00:00
return false ;
} ;
2023-08-07 09:05:21 +00:00
size_t max_batch_size = coordination_settings - > max_requests_batch_size ;
while ( ! shutdown_called & & current_batch . size ( ) < max_batch_size & & ! has_reconfig_request
& & current_batch_bytes_size < max_batch_bytes_size & & try_get_request ( ) )
2022-11-29 12:58:58 +00:00
;
const auto prev_result_done = [ & ]
{
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
return ! prev_result | | prev_result - > has_result ( ) | | prev_result - > get_result_code ( ) ! = nuraft : : cmd_result_code : : OK ;
} ;
/// Waiting until previous append will be successful, or batch is big enough
2023-09-08 12:45:01 +00:00
while ( ! shutdown_called & & ! has_reconfig_request & &
! prev_result_done ( ) & & current_batch . size ( ) < = max_batch_size
2023-08-07 09:05:21 +00:00
& & current_batch_bytes_size < max_batch_bytes_size )
2022-11-29 12:58:58 +00:00
{
try_get_request ( ) ;
2021-04-16 13:50:09 +00:00
}
2022-09-17 17:22:58 +00:00
}
else
has_read_request = true ;
2021-04-16 13:50:09 +00:00
2022-09-17 17:22:58 +00:00
if ( shutdown_called )
break ;
2023-08-07 09:05:21 +00:00
nuraft : : ptr < nuraft : : buffer > result_buf = nullptr ;
2022-09-17 17:22:58 +00:00
/// Forcefully process all previous pending requests
if ( prev_result )
2023-08-07 09:05:21 +00:00
result_buf = forceWaitAndProcessResult ( prev_result , prev_batch ) ;
2022-09-17 17:22:58 +00:00
/// Process collected write requests batch
if ( ! current_batch . empty ( ) )
{
2023-03-22 23:32:58 +00:00
LOG_TRACE ( log , " Processing requests batch, size: {}, bytes: {} " , current_batch . size ( ) , current_batch_bytes_size ) ;
2022-09-17 17:22:58 +00:00
auto result = server - > putRequestBatch ( current_batch ) ;
2022-06-27 12:05:27 +00:00
2023-08-07 09:05:21 +00:00
if ( ! result )
2021-04-16 13:50:09 +00:00
{
2022-09-17 17:22:58 +00:00
addErrorResponses ( current_batch , Coordination : : Error : : ZCONNECTIONLOSS ) ;
current_batch . clear ( ) ;
2023-03-22 23:32:58 +00:00
current_batch_bytes_size = 0 ;
2021-04-16 13:50:09 +00:00
}
2022-09-17 17:22:58 +00:00
prev_batch = std : : move ( current_batch ) ;
prev_result = result ;
2021-04-16 13:50:09 +00:00
}
2023-08-07 09:05:21 +00:00
/// If we will execute read or reconfig next, we have to process result now
if ( has_read_request | | has_reconfig_request )
{
if ( prev_result )
result_buf = forceWaitAndProcessResult ( prev_result , current_batch ) ;
2023-09-08 12:45:01 +00:00
/// In case of older version or disabled async replication, result buf will be set to value of `commit` function
/// which always returns nullptr
/// in that case we don't have to do manual wait because are already sure that the batch was committed when we get
/// the result back
/// otherwise, we need to manually wait until the batch is committed
2023-08-07 09:05:21 +00:00
if ( result_buf )
{
nuraft : : buffer_serializer bs ( result_buf ) ;
auto log_idx = bs . get_u64 ( ) ;
2023-09-08 12:45:01 +00:00
/// we will wake up this thread on each commit so we need to run it in loop until the last request of batch is committed
2023-08-07 09:05:21 +00:00
while ( true )
{
2023-09-29 14:34:17 +00:00
if ( shutdown_called )
return ;
2023-09-08 12:45:01 +00:00
auto current_last_committed_idx = our_last_committed_log_idx . load ( std : : memory_order_relaxed ) ;
2023-08-07 09:05:21 +00:00
if ( current_last_committed_idx > = log_idx )
break ;
2023-09-08 12:45:01 +00:00
our_last_committed_log_idx . wait ( current_last_committed_idx ) ;
2023-08-07 09:05:21 +00:00
}
}
}
2023-07-06 17:12:24 +00:00
if ( has_reconfig_request )
server - > getKeeperStateMachine ( ) - > reconfigure ( request ) ;
2022-09-17 17:22:58 +00:00
/// Read request always goes after write batch (last request)
if ( has_read_request )
{
if ( server - > isLeaderAlive ( ) )
server - > putLocalReadRequest ( request ) ;
else
addErrorResponses ( { request } , Coordination : : Error : : ZCONNECTIONLOSS ) ;
}
2021-02-08 13:06:55 +00:00
}
}
2021-04-16 13:50:09 +00:00
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-02-08 13:06:55 +00:00
}
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : responseThread ( )
2021-02-08 13:06:55 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName ( " KeeperRspT " ) ;
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2021-02-08 13:06:55 +00:00
while ( ! shutdown_called )
{
2021-03-29 08:24:56 +00:00
KeeperStorage : : ResponseForSession response_for_session ;
2021-02-08 13:06:55 +00:00
2021-11-18 20:17:22 +00:00
uint64_t max_wait = configuration_and_settings - > coordination_settings - > operation_timeout_ms . totalMilliseconds ( ) ;
2021-02-08 13:06:55 +00:00
if ( responses_queue . tryPop ( response_for_session , max_wait ) )
{
if ( shutdown_called )
break ;
try
{
setResponse ( response_for_session . session_id , response_for_session . response ) ;
2021-01-19 14:22:28 +00:00
}
2021-01-26 07:47:04 +00:00
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-01-19 14:22:28 +00:00
}
}
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : snapshotThread ( )
2021-03-05 10:40:24 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName ( " KeeperSnpT " ) ;
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2021-03-05 10:40:24 +00:00
while ( ! shutdown_called )
{
CreateSnapshotTask task ;
2021-10-06 11:02:40 +00:00
if ( ! snapshots_queue . pop ( task ) )
break ;
2021-03-05 10:40:24 +00:00
if ( shutdown_called )
break ;
try
{
2023-05-25 08:43:11 +00:00
auto snapshot_file_info = task . create_snapshot ( std : : move ( task . snapshot ) ) ;
2022-09-14 09:09:47 +00:00
2023-05-25 08:43:11 +00:00
if ( snapshot_file_info . path . empty ( ) )
2022-09-14 09:09:47 +00:00
continue ;
2022-09-21 11:53:54 +00:00
if ( isLeader ( ) )
2023-05-25 08:43:11 +00:00
snapshot_s3 . uploadSnapshot ( snapshot_file_info ) ;
2022-09-14 09:09:47 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : setResponse ( int64_t session_id , const Coordination : : ZooKeeperResponsePtr & response )
2021-01-19 14:22:28 +00:00
{
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
2021-08-24 12:30:31 +00:00
/// Special new session response.
2021-04-16 13:50:09 +00:00
if ( response - > xid ! = Coordination : : WATCH_XID & & response - > getOpNum ( ) = = Coordination : : OpNum : : SessionID )
{
const Coordination : : ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast < const Coordination : : ZooKeeperSessionIDResponse & > ( * response ) ;
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Nobody waits for this session id
2022-04-18 10:18:43 +00:00
if ( session_id_resp . server_id ! = server - > getServerID ( ) | | ! new_session_id_response_callback . contains ( session_id_resp . internal_id ) )
2021-04-16 13:50:09 +00:00
return ;
auto callback = new_session_id_response_callback [ session_id_resp . internal_id ] ;
callback ( response ) ;
new_session_id_response_callback . erase ( session_id_resp . internal_id ) ;
}
2021-08-24 12:30:31 +00:00
else /// Normal response, just write to client
2021-04-16 13:50:09 +00:00
{
2021-08-24 12:30:31 +00:00
auto session_response_callback = session_to_response_callback . find ( session_id ) ;
/// Session was disconnected, just skip this response
if ( session_response_callback = = session_to_response_callback . end ( ) )
2021-04-16 13:50:09 +00:00
return ;
2021-08-24 12:30:31 +00:00
session_response_callback - > second ( response ) ;
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Session closed, no more writes
if ( response - > xid ! = Coordination : : WATCH_XID & & response - > getOpNum ( ) = = Coordination : : OpNum : : Close )
{
2021-08-24 12:30:31 +00:00
session_to_response_callback . erase ( session_response_callback ) ;
2022-06-10 07:49:46 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : KeeperAliveConnections ) ;
2021-04-16 13:50:09 +00:00
}
}
2021-01-19 14:22:28 +00:00
}
2021-08-24 12:30:31 +00:00
bool KeeperDispatcher : : putRequest ( const Coordination : : ZooKeeperRequestPtr & request , int64_t session_id )
2021-01-19 14:22:28 +00:00
{
{
2021-08-24 12:30:31 +00:00
/// If session was already disconnected than we will ignore requests
2021-01-19 14:22:28 +00:00
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
2022-04-18 10:18:43 +00:00
if ( ! session_to_response_callback . contains ( session_id ) )
2021-01-25 12:29:12 +00:00
return false ;
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperStorage : : RequestForSession request_info ;
2021-01-19 14:22:28 +00:00
request_info . request = request ;
2022-01-06 14:44:01 +00:00
using namespace std : : chrono ;
request_info . time = duration_cast < milliseconds > ( system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
2021-01-19 14:22:28 +00:00
request_info . session_id = session_id ;
2023-09-29 14:34:17 +00:00
if ( keeper_context - > shutdown_called )
2021-05-22 07:46:12 +00:00
return false ;
2021-01-19 14:22:28 +00:00
/// Put close requests without timeouts
if ( request - > getOpNum ( ) = = Coordination : : OpNum : : Close )
2021-10-07 22:06:33 +00:00
{
2022-09-17 17:22:58 +00:00
if ( ! requests_queue - > push ( std : : move ( request_info ) ) )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : SYSTEM_ERROR , " Cannot push request to queue " ) ;
2021-10-07 22:06:33 +00:00
}
2022-09-17 17:22:58 +00:00
else if ( ! requests_queue - > tryPush ( std : : move ( request_info ) , configuration_and_settings - > coordination_settings - > operation_timeout_ms . totalMilliseconds ( ) ) )
2021-10-07 22:06:33 +00:00
{
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : TIMEOUT_EXCEEDED , " Cannot push request to queue within operation timeout " ) ;
2021-10-07 22:06:33 +00:00
}
2022-06-10 07:49:46 +00:00
CurrentMetrics : : add ( CurrentMetrics : : KeeperOutstandingRequets ) ;
2021-01-25 12:29:12 +00:00
return true ;
2021-01-19 14:22:28 +00:00
}
2022-12-23 14:09:24 +00:00
void KeeperDispatcher : : initialize ( const Poco : : Util : : AbstractConfiguration & config , bool standalone_keeper , bool start_async , const MultiVersion < Macros > : : Version & macros )
2021-01-19 14:22:28 +00:00
{
2021-02-01 11:27:26 +00:00
LOG_DEBUG ( log , " Initializing storage dispatcher " ) ;
2021-01-25 12:29:12 +00:00
2023-11-13 12:09:13 +00:00
keeper_context = std : : make_shared < KeeperContext > ( standalone_keeper ) ;
2021-11-18 20:17:22 +00:00
configuration_and_settings = KeeperConfigurationAndSettings : : loadFromConfig ( config , standalone_keeper ) ;
2021-01-25 12:29:12 +00:00
2023-11-11 02:43:58 +00:00
keeper_context - > initialize ( config , this ) ;
2023-05-25 08:43:11 +00:00
2023-09-29 14:34:17 +00:00
requests_queue = std : : make_unique < RequestsQueue > ( configuration_and_settings - > coordination_settings - > max_request_queue_size ) ;
2021-03-26 11:18:31 +00:00
request_thread = ThreadFromGlobalPool ( [ this ] { requestThread ( ) ; } ) ;
responses_thread = ThreadFromGlobalPool ( [ this ] { responseThread ( ) ; } ) ;
snapshot_thread = ThreadFromGlobalPool ( [ this ] { snapshotThread ( ) ; } ) ;
2022-12-23 14:09:24 +00:00
snapshot_s3 . startup ( config , macros ) ;
2021-03-26 11:18:31 +00:00
2023-05-24 09:04:12 +00:00
server = std : : make_unique < KeeperServer > (
configuration_and_settings ,
config ,
responses_queue ,
snapshots_queue ,
2023-05-25 08:43:11 +00:00
keeper_context ,
2023-05-24 09:04:12 +00:00
snapshot_s3 ,
2023-08-07 09:05:21 +00:00
[ this ] ( uint64_t log_idx , const KeeperStorage : : RequestForSession & request_for_session )
2023-03-24 14:37:40 +00:00
{
{
2023-08-07 09:05:21 +00:00
/// check if we have queue of read requests depending on this request to be committed
std : : lock_guard lock ( read_request_queue_mutex ) ;
if ( auto it = read_request_queue . find ( request_for_session . session_id ) ; it ! = read_request_queue . end ( ) )
2023-03-24 20:40:45 +00:00
{
2023-08-07 09:05:21 +00:00
auto & xid_to_request_queue = it - > second ;
if ( auto request_queue_it = xid_to_request_queue . find ( request_for_session . request - > xid ) ;
request_queue_it ! = xid_to_request_queue . end ( ) )
2023-05-24 09:04:12 +00:00
{
2023-08-07 09:05:21 +00:00
for ( const auto & read_request : request_queue_it - > second )
{
if ( server - > isLeaderAlive ( ) )
server - > putLocalReadRequest ( read_request ) ;
else
addErrorResponses ( { read_request } , Coordination : : Error : : ZCONNECTIONLOSS ) ;
}
2023-03-24 14:37:40 +00:00
2023-08-07 09:05:21 +00:00
xid_to_request_queue . erase ( request_queue_it ) ;
}
2023-05-24 09:04:12 +00:00
}
2023-03-24 20:40:45 +00:00
}
2023-08-07 09:05:21 +00:00
2023-09-08 12:45:01 +00:00
our_last_committed_log_idx . store ( log_idx , std : : memory_order_relaxed ) ;
our_last_committed_log_idx . notify_all ( ) ;
2023-05-24 09:04:12 +00:00
} ) ;
2021-08-24 12:30:31 +00:00
2021-02-01 11:27:26 +00:00
try
2021-01-25 14:10:18 +00:00
{
2021-02-11 09:17:57 +00:00
LOG_DEBUG ( log , " Waiting server to initialize " ) ;
2022-04-11 06:41:46 +00:00
server - > startup ( config , configuration_and_settings - > enable_ipv6 ) ;
2021-02-11 09:58:02 +00:00
LOG_DEBUG ( log , " Server initialized, waiting for quorum " ) ;
2021-02-11 09:49:49 +00:00
2021-10-14 10:21:41 +00:00
if ( ! start_async )
{
server - > waitInit ( ) ;
LOG_DEBUG ( log , " Quorum initialized " ) ;
}
else
{
LOG_INFO ( log , " Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready " ) ;
}
2021-01-27 17:54:25 +00:00
}
2021-02-01 11:27:26 +00:00
catch ( . . . )
2021-01-27 17:54:25 +00:00
{
2021-02-01 11:27:26 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
throw ;
2021-01-25 14:10:18 +00:00
}
2021-01-25 12:29:12 +00:00
2021-08-24 12:30:31 +00:00
/// Start it after keeper server start
2021-02-03 20:32:15 +00:00
session_cleaner_thread = ThreadFromGlobalPool ( [ this ] { sessionCleanerTask ( ) ; } ) ;
2023-04-20 13:26:02 +00:00
update_configuration_thread = reconfigEnabled ( )
? ThreadFromGlobalPool ( [ this ] { clusterUpdateThread ( ) ; } )
: ThreadFromGlobalPool ( [ this ] { clusterUpdateWithReconfigDisabledThread ( ) ; } ) ;
2021-01-25 12:29:12 +00:00
2021-02-01 11:27:26 +00:00
LOG_DEBUG ( log , " Dispatcher initialized " ) ;
2021-01-19 14:22:28 +00:00
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : shutdown ( )
2021-01-19 14:22:28 +00:00
{
try
{
2021-01-26 14:08:31 +00:00
{
2023-11-13 12:09:13 +00:00
if ( ! keeper_context | | keeper_context - > shutdown_called . exchange ( true ) )
2021-01-26 14:08:31 +00:00
return ;
2021-02-01 11:27:26 +00:00
LOG_DEBUG ( log , " Shutting down storage dispatcher " ) ;
2021-01-26 14:08:31 +00:00
2023-09-29 14:34:17 +00:00
our_last_committed_log_idx = std : : numeric_limits < uint64_t > : : max ( ) ;
our_last_committed_log_idx . notify_all ( ) ;
2023-10-16 08:17:23 +00:00
keeper_context - > local_logs_preprocessed = true ;
2023-09-29 14:34:17 +00:00
2021-02-03 20:32:15 +00:00
if ( session_cleaner_thread . joinable ( ) )
session_cleaner_thread . join ( ) ;
2022-09-17 17:22:58 +00:00
if ( requests_queue )
2021-08-30 13:23:31 +00:00
{
2022-09-17 17:22:58 +00:00
requests_queue - > finish ( ) ;
2021-10-08 13:32:02 +00:00
if ( request_thread . joinable ( ) )
request_thread . join ( ) ;
2021-08-30 13:23:31 +00:00
}
2021-02-08 13:06:55 +00:00
2021-10-06 11:02:40 +00:00
responses_queue . finish ( ) ;
2021-02-08 13:06:55 +00:00
if ( responses_thread . joinable ( ) )
responses_thread . join ( ) ;
2021-03-05 10:40:24 +00:00
2021-10-06 11:02:40 +00:00
snapshots_queue . finish ( ) ;
2021-03-05 10:40:24 +00:00
if ( snapshot_thread . joinable ( ) )
snapshot_thread . join ( ) ;
2021-10-19 12:00:26 +00:00
2023-04-20 13:26:02 +00:00
cluster_update_queue . finish ( ) ;
2021-10-19 12:00:26 +00:00
if ( update_configuration_thread . joinable ( ) )
update_configuration_thread . join ( ) ;
2021-01-26 14:08:31 +00:00
}
2021-09-02 09:20:11 +00:00
KeeperStorage : : RequestForSession request_for_session ;
2021-08-30 13:23:31 +00:00
2021-09-02 09:20:11 +00:00
/// Set session expired for all pending requests
2022-09-17 17:22:58 +00:00
while ( requests_queue & & requests_queue - > tryPop ( request_for_session ) )
2021-09-02 09:20:11 +00:00
{
2022-06-10 07:49:46 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : KeeperOutstandingRequets ) ;
2021-10-07 17:13:56 +00:00
auto response = request_for_session . request - > makeResponse ( ) ;
response - > error = Coordination : : Error : : ZSESSIONEXPIRED ;
setResponse ( request_for_session . session_id , response ) ;
2021-01-26 14:08:31 +00:00
}
2021-05-22 07:50:23 +00:00
2022-09-12 12:22:48 +00:00
KeeperStorage : : RequestsForSessions close_requests ;
{
/// Clear all registered sessions
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
2022-09-16 10:03:21 +00:00
if ( server & & hasLeader ( ) )
2022-09-12 12:22:48 +00:00
{
2022-09-12 18:19:41 +00:00
close_requests . reserve ( session_to_response_callback . size ( ) ) ;
2022-09-12 12:22:48 +00:00
// send to leader CLOSE requests for active sessions
for ( const auto & [ session , response ] : session_to_response_callback )
{
2022-09-13 09:51:46 +00:00
auto request = Coordination : : ZooKeeperRequestFactory : : instance ( ) . get ( Coordination : : OpNum : : Close ) ;
2022-09-12 12:22:48 +00:00
request - > xid = Coordination : : CLOSE_XID ;
using namespace std : : chrono ;
2022-09-13 09:51:46 +00:00
KeeperStorage : : RequestForSession request_info
{
. session_id = session ,
. time = duration_cast < milliseconds > ( system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ,
. request = std : : move ( request ) ,
2023-08-23 12:07:02 +00:00
. digest = std : : nullopt
2022-09-13 09:51:46 +00:00
} ;
2022-09-12 12:22:48 +00:00
close_requests . push_back ( std : : move ( request_info ) ) ;
}
}
session_to_response_callback . clear ( ) ;
}
2023-07-20 14:02:55 +00:00
if ( server & & ! close_requests . empty ( ) )
2022-09-12 12:22:48 +00:00
{
2023-07-20 14:02:55 +00:00
// if there is no leader, there is no reason to do CLOSE because it's a write request
if ( hasLeader ( ) )
{
LOG_INFO ( log , " Trying to close {} session(s) " , close_requests . size ( ) ) ;
const auto raft_result = server - > putRequestBatch ( close_requests ) ;
auto sessions_closing_done_promise = std : : make_shared < std : : promise < void > > ( ) ;
auto sessions_closing_done = sessions_closing_done_promise - > get_future ( ) ;
raft_result - > when_ready ( [ my_sessions_closing_done_promise = std : : move ( sessions_closing_done_promise ) ] (
nuraft : : cmd_result < nuraft : : ptr < nuraft : : buffer > > & /*result*/ ,
nuraft : : ptr < std : : exception > & /*exception*/ ) { my_sessions_closing_done_promise - > set_value ( ) ; } ) ;
auto session_shutdown_timeout = configuration_and_settings - > coordination_settings - > session_shutdown_timeout . totalMilliseconds ( ) ;
if ( sessions_closing_done . wait_for ( std : : chrono : : milliseconds ( session_shutdown_timeout ) ) ! = std : : future_status : : ready )
LOG_WARNING (
log ,
" Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout. " ,
session_shutdown_timeout ) ;
}
else
{
LOG_INFO ( log , " Sessions cannot be closed during shutdown because there is no active leader " ) ;
}
2022-09-12 12:22:48 +00:00
}
if ( server )
server - > shutdown ( ) ;
2022-10-17 11:02:36 +00:00
snapshot_s3 . shutdown ( ) ;
2022-06-10 07:49:46 +00:00
CurrentMetrics : : set ( CurrentMetrics : : KeeperAliveConnections , 0 ) ;
2022-09-12 12:22:48 +00:00
2021-01-19 14:22:28 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-02-01 11:27:26 +00:00
LOG_DEBUG ( log , " Dispatcher shut down " ) ;
2021-01-19 14:22:28 +00:00
}
2022-04-13 14:08:13 +00:00
void KeeperDispatcher : : forceRecovery ( )
{
server - > forceRecovery ( ) ;
}
2021-08-24 12:30:31 +00:00
KeeperDispatcher : : ~ KeeperDispatcher ( )
2021-01-26 14:08:31 +00:00
{
shutdown ( ) ;
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : registerSession ( int64_t session_id , ZooKeeperResponseCallback callback )
2021-01-19 14:22:28 +00:00
{
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
if ( ! session_to_response_callback . try_emplace ( session_id , callback ) . second )
throw Exception ( DB : : ErrorCodes : : LOGICAL_ERROR , " Session with id {} already registered in dispatcher " , session_id ) ;
2022-06-10 07:49:46 +00:00
CurrentMetrics : : add ( CurrentMetrics : : KeeperAliveConnections ) ;
2021-01-19 14:22:28 +00:00
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : sessionCleanerTask ( )
2021-02-03 20:32:15 +00:00
{
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2021-02-03 20:32:15 +00:00
while ( true )
{
if ( shutdown_called )
return ;
try
{
2021-08-24 12:30:31 +00:00
/// Only leader node must check dead sessions
2021-10-14 10:21:41 +00:00
if ( server - > checkInit ( ) & & isLeader ( ) )
2021-02-03 20:32:15 +00:00
{
auto dead_sessions = server - > getDeadSessions ( ) ;
2021-08-24 12:30:31 +00:00
2021-02-03 20:32:15 +00:00
for ( int64_t dead_session : dead_sessions )
{
LOG_INFO ( log , " Found dead session {}, will try to close it " , dead_session ) ;
2021-08-24 12:30:31 +00:00
/// Close session == send close request to raft server
2022-09-13 09:51:46 +00:00
auto request = Coordination : : ZooKeeperRequestFactory : : instance ( ) . get ( Coordination : : OpNum : : Close ) ;
2021-02-03 20:32:15 +00:00
request - > xid = Coordination : : CLOSE_XID ;
2022-01-13 09:49:39 +00:00
using namespace std : : chrono ;
2022-09-13 09:51:46 +00:00
KeeperStorage : : RequestForSession request_info
{
. session_id = dead_session ,
. time = duration_cast < milliseconds > ( system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ,
. request = std : : move ( request ) ,
2023-08-23 12:07:02 +00:00
. digest = std : : nullopt
2022-09-13 09:51:46 +00:00
} ;
2023-07-03 07:23:13 +00:00
if ( ! requests_queue - > push ( std : : move ( request_info ) ) )
LOG_INFO ( log , " Cannot push close request to queue while cleaning outdated sessions " ) ;
CurrentMetrics : : add ( CurrentMetrics : : KeeperOutstandingRequets ) ;
2021-08-24 12:30:31 +00:00
/// Remove session from registered sessions
2021-02-08 13:06:55 +00:00
finishSession ( dead_session ) ;
2021-02-10 09:28:53 +00:00
LOG_INFO ( log , " Dead session close request pushed " ) ;
2021-02-03 20:32:15 +00:00
}
}
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-11-18 20:17:22 +00:00
auto time_to_sleep = configuration_and_settings - > coordination_settings - > dead_session_check_period_ms . totalMilliseconds ( ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( time_to_sleep ) ) ;
2021-02-03 20:32:15 +00:00
}
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : finishSession ( int64_t session_id )
2021-01-21 11:37:20 +00:00
{
2023-07-24 15:59:50 +00:00
/// shutdown() method will cleanup sessions if needed
2023-09-29 14:34:17 +00:00
if ( keeper_context - > shutdown_called )
2023-07-24 15:59:50 +00:00
return ;
2022-06-10 07:49:46 +00:00
{
2023-03-24 20:40:45 +00:00
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
auto session_it = session_to_response_callback . find ( session_id ) ;
if ( session_it ! = session_to_response_callback . end ( ) )
{
session_to_response_callback . erase ( session_it ) ;
CurrentMetrics : : sub ( CurrentMetrics : : KeeperAliveConnections ) ;
}
}
{
std : : lock_guard lock ( read_request_queue_mutex ) ;
read_request_queue . erase ( session_id ) ;
2022-06-10 07:49:46 +00:00
}
2021-01-21 11:37:20 +00:00
}
2021-08-24 12:30:31 +00:00
void KeeperDispatcher : : addErrorResponses ( const KeeperStorage : : RequestsForSessions & requests_for_sessions , Coordination : : Error error )
2021-04-16 13:50:09 +00:00
{
2022-06-15 12:48:30 +00:00
for ( const auto & request_for_session : requests_for_sessions )
2021-04-16 13:50:09 +00:00
{
KeeperStorage : : ResponsesForSessions responses ;
2022-06-15 12:48:30 +00:00
auto response = request_for_session . request - > makeResponse ( ) ;
response - > xid = request_for_session . request - > xid ;
response - > zxid = 0 ;
2021-04-16 13:50:09 +00:00
response - > error = error ;
2022-06-15 12:48:30 +00:00
if ( ! responses_queue . push ( DB : : KeeperStorage : : ResponseForSession { request_for_session . session_id , response } ) )
2021-10-08 13:32:02 +00:00
throw Exception ( ErrorCodes : : SYSTEM_ERROR ,
2021-10-08 08:48:08 +00:00
" Could not push error response xid {} zxid {} error message {} to responses queue " ,
response - > xid ,
response - > zxid ,
2023-04-20 13:26:02 +00:00
error ) ;
2021-04-16 13:50:09 +00:00
}
}
2023-08-07 09:05:21 +00:00
nuraft : : ptr < nuraft : : buffer > KeeperDispatcher : : forceWaitAndProcessResult ( RaftAppendResult & result , KeeperStorage : : RequestsForSessions & requests_for_sessions )
2021-04-16 13:50:09 +00:00
{
if ( ! result - > has_result ( ) )
result - > get ( ) ;
2022-09-17 17:22:58 +00:00
/// If we get some errors, than send them to clients
if ( ! result - > get_accepted ( ) | | result - > get_result_code ( ) = = nuraft : : cmd_result_code : : TIMEOUT )
addErrorResponses ( requests_for_sessions , Coordination : : Error : : ZOPERATIONTIMEOUT ) ;
else if ( result - > get_result_code ( ) ! = nuraft : : cmd_result_code : : OK )
addErrorResponses ( requests_for_sessions , Coordination : : Error : : ZCONNECTIONLOSS ) ;
2023-08-07 09:05:21 +00:00
auto result_buf = result - > get ( ) ;
2021-04-16 13:50:09 +00:00
result = nullptr ;
2022-09-17 17:22:58 +00:00
requests_for_sessions . clear ( ) ;
2023-08-07 09:05:21 +00:00
return result_buf ;
2021-04-16 13:50:09 +00:00
}
2021-08-24 12:30:31 +00:00
int64_t KeeperDispatcher : : getSessionID ( int64_t session_timeout_ms )
2021-04-16 13:50:09 +00:00
{
2021-08-24 12:30:31 +00:00
/// New session id allocation is a special request, because we cannot process it in normal
/// way: get request -> put to raft -> set response for registered callback.
2021-04-16 13:50:09 +00:00
KeeperStorage : : RequestForSession request_info ;
std : : shared_ptr < Coordination : : ZooKeeperSessionIDRequest > request = std : : make_shared < Coordination : : ZooKeeperSessionIDRequest > ( ) ;
2021-08-24 12:30:31 +00:00
/// Internal session id. It's a temporary number which is unique for each client on this server
/// but can be same on different servers.
2021-04-16 13:50:09 +00:00
request - > internal_id = internal_session_id_counter . fetch_add ( 1 ) ;
request - > session_timeout_ms = session_timeout_ms ;
2021-04-16 18:31:23 +00:00
request - > server_id = server - > getServerID ( ) ;
2021-04-16 13:50:09 +00:00
request_info . request = request ;
2022-01-13 09:49:39 +00:00
using namespace std : : chrono ;
request_info . time = duration_cast < milliseconds > ( system_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
2021-04-16 13:50:09 +00:00
request_info . session_id = - 1 ;
auto promise = std : : make_shared < std : : promise < int64_t > > ( ) ;
auto future = promise - > get_future ( ) ;
2021-08-24 12:30:31 +00:00
2021-04-16 13:50:09 +00:00
{
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
2021-04-16 18:31:23 +00:00
new_session_id_response_callback [ request - > internal_id ] = [ promise , internal_id = request - > internal_id ] ( const Coordination : : ZooKeeperResponsePtr & response )
2021-04-16 13:50:09 +00:00
{
if ( response - > getOpNum ( ) ! = Coordination : : OpNum : : SessionID )
promise - > set_exception ( std : : make_exception_ptr ( Exception ( ErrorCodes : : LOGICAL_ERROR ,
2023-04-20 13:26:02 +00:00
" Incorrect response of type {} instead of SessionID response " , response - > getOpNum ( ) ) ) ) ;
2021-04-16 13:50:09 +00:00
2021-04-16 18:31:23 +00:00
auto session_id_response = dynamic_cast < const Coordination : : ZooKeeperSessionIDResponse & > ( * response ) ;
if ( session_id_response . internal_id ! = internal_id )
{
promise - > set_exception ( std : : make_exception_ptr ( Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Incorrect response with internal id {} instead of {} " , session_id_response . internal_id , internal_id ) ) ) ;
}
2021-04-16 13:50:09 +00:00
if ( response - > error ! = Coordination : : Error : : ZOK )
2023-08-14 16:57:40 +00:00
promise - > set_exception ( std : : make_exception_ptr ( zkutil : : KeeperException : : fromMessage ( response - > error , " SessionID request failed with error " ) ) ) ;
2021-04-16 13:50:09 +00:00
2021-04-16 18:31:23 +00:00
promise - > set_value ( session_id_response . session_id ) ;
2021-04-16 13:50:09 +00:00
} ;
}
2021-08-24 12:30:31 +00:00
/// Push new session request to queue
2023-07-03 07:23:13 +00:00
if ( ! requests_queue - > tryPush ( std : : move ( request_info ) , session_timeout_ms ) )
throw Exception ( ErrorCodes : : TIMEOUT_EXCEEDED , " Cannot push session id request to queue within session timeout " ) ;
CurrentMetrics : : add ( CurrentMetrics : : KeeperOutstandingRequets ) ;
2021-04-16 13:50:09 +00:00
if ( future . wait_for ( std : : chrono : : milliseconds ( session_timeout_ms ) ) ! = std : : future_status : : ready )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : TIMEOUT_EXCEEDED , " Cannot receive session id within session timeout " ) ;
2021-04-16 13:50:09 +00:00
2021-08-24 12:30:31 +00:00
/// Forcefully wait for request execution because we cannot process any other
/// requests for this client until it get new session id.
2021-04-16 13:50:09 +00:00
return future . get ( ) ;
}
2023-04-20 13:26:02 +00:00
void KeeperDispatcher : : clusterUpdateWithReconfigDisabledThread ( )
2021-10-18 15:27:51 +00:00
{
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2023-04-20 13:26:02 +00:00
while ( ! shutdown_called )
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
try
{
if ( ! server - > checkInit ( ) )
{
LOG_INFO ( log , " Server still not initialized, will not apply configuration until initialization finished " ) ;
2022-04-26 07:32:02 +00:00
std : : this_thread : : sleep_for ( 5000 ms ) ;
2021-10-19 12:00:26 +00:00
continue ;
}
2022-04-19 08:08:13 +00:00
if ( server - > isRecovering ( ) )
{
LOG_INFO ( log , " Server is recovering, will not apply configuration until recovery is finished " ) ;
2022-04-26 07:32:02 +00:00
std : : this_thread : : sleep_for ( 5000 ms ) ;
2022-04-19 08:08:13 +00:00
continue ;
}
2023-04-20 13:26:02 +00:00
ClusterUpdateAction action ;
if ( ! cluster_update_queue . pop ( action ) )
2021-10-19 12:00:26 +00:00
break ;
2021-10-19 13:11:29 +00:00
/// We must wait this update from leader or apply it ourself (if we are leader)
bool done = false ;
while ( ! done )
2021-10-19 12:00:26 +00:00
{
2022-04-19 08:08:13 +00:00
if ( server - > isRecovering ( ) )
break ;
2021-10-19 13:11:29 +00:00
if ( shutdown_called )
return ;
2021-10-19 12:00:26 +00:00
2021-10-19 13:11:29 +00:00
if ( isLeader ( ) )
{
2023-04-20 13:26:02 +00:00
server - > applyConfigUpdateWithReconfigDisabled ( action ) ;
2021-10-19 13:11:29 +00:00
done = true ;
}
2023-04-20 13:26:02 +00:00
else if ( done = server - > waitForConfigUpdateWithReconfigDisabled ( action ) ; ! done )
LOG_INFO ( log ,
" Cannot wait for configuration update, maybe we became leader "
" or maybe update is invalid, will try to wait one more time " ) ;
2021-10-19 12:00:26 +00:00
}
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
}
2023-04-20 13:26:02 +00:00
void KeeperDispatcher : : clusterUpdateThread ( )
{
2023-08-16 14:37:30 +00:00
using enum KeeperServer : : ConfigUpdateState ;
bool last_command_was_leader_change = false ;
2023-09-29 14:34:17 +00:00
auto & shutdown_called = keeper_context - > shutdown_called ;
2023-04-20 13:26:02 +00:00
while ( ! shutdown_called )
{
ClusterUpdateAction action ;
if ( ! cluster_update_queue . pop ( action ) )
return ;
2023-08-16 14:37:30 +00:00
if ( const auto res = server - > applyConfigUpdate ( action , last_command_was_leader_change ) ; res = = Accepted )
2023-04-20 13:26:02 +00:00
LOG_DEBUG ( log , " Processing config update {}: accepted " , action ) ;
2023-08-16 14:37:30 +00:00
else
2023-04-20 13:26:02 +00:00
{
2023-08-16 14:37:30 +00:00
last_command_was_leader_change = res = = WaitBeforeChangingLeader ;
2023-04-20 13:26:02 +00:00
( void ) cluster_update_queue . pushFront ( action ) ;
LOG_DEBUG ( log , " Processing config update {}: declined, backoff " , action ) ;
2023-08-16 14:37:30 +00:00
std : : this_thread : : sleep_for ( last_command_was_leader_change
? configuration_and_settings - > coordination_settings - > sleep_before_leader_change_ms
: 50 ms ) ;
2023-04-20 13:26:02 +00:00
}
}
}
2023-07-03 10:11:44 +00:00
void KeeperDispatcher : : pushClusterUpdates ( ClusterUpdateActions & & actions )
2023-04-20 13:26:02 +00:00
{
2023-09-29 14:34:17 +00:00
if ( keeper_context - > shutdown_called ) return ;
2023-04-20 13:26:02 +00:00
for ( auto & & action : actions )
{
if ( ! cluster_update_queue . push ( std : : move ( action ) ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cannot push configuration update " ) ;
LOG_DEBUG ( log , " Processing config update {}: pushed " , action ) ;
}
}
bool KeeperDispatcher : : reconfigEnabled ( ) const
{
return server - > reconfigEnabled ( ) ;
}
2022-04-14 12:00:47 +00:00
bool KeeperDispatcher : : isServerActive ( ) const
{
return checkInit ( ) & & hasLeader ( ) & & ! server - > isRecovering ( ) ;
}
2022-12-23 14:09:24 +00:00
void KeeperDispatcher : : updateConfiguration ( const Poco : : Util : : AbstractConfiguration & config , const MultiVersion < Macros > : : Version & macros )
2021-10-19 12:00:26 +00:00
{
2023-04-20 13:26:02 +00:00
auto diff = server - > getRaftConfigurationDiff ( config ) ;
2021-10-19 12:00:26 +00:00
if ( diff . empty ( ) )
2023-04-20 13:26:02 +00:00
LOG_TRACE ( log , " Configuration update triggered, but nothing changed for Raft " ) ;
else if ( reconfigEnabled ( ) )
LOG_WARNING ( log ,
" Raft configuration changed, but keeper_server.enable_reconfiguration is on. "
" This update will be ignored. Use \" reconfig \" instead " ) ;
2021-10-19 12:00:26 +00:00
else if ( diff . size ( ) > 1 )
2023-04-20 13:26:02 +00:00
LOG_WARNING ( log ,
" Configuration changed for more than one server ({}) from cluster, "
" it's strictly not recommended " , diff . size ( ) ) ;
2021-10-18 15:27:51 +00:00
else
2021-10-19 12:00:26 +00:00
LOG_DEBUG ( log , " Configuration change size ({}) " , diff . size ( ) ) ;
2023-04-20 13:26:02 +00:00
if ( ! reconfigEnabled ( ) )
for ( auto & change : diff )
if ( ! cluster_update_queue . push ( change ) )
throw Exception ( ErrorCodes : : SYSTEM_ERROR , " Cannot push configuration update to queue " ) ;
2022-09-15 07:45:28 +00:00
2022-12-23 14:09:24 +00:00
snapshot_s3 . updateS3Configuration ( config , macros ) ;
2021-10-18 15:27:51 +00:00
}
2021-11-18 20:17:22 +00:00
void KeeperDispatcher : : updateKeeperStatLatency ( uint64_t process_time_ms )
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
keeper_stats . updateLatency ( process_time_ms ) ;
2021-10-27 12:26:42 +00:00
}
2023-05-25 09:10:45 +00:00
static uint64_t getTotalSize ( const DiskPtr & disk , const std : : string & path = " " )
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
checkStackSize ( ) ;
2021-10-27 12:26:42 +00:00
2023-05-25 09:10:45 +00:00
uint64_t size = 0 ;
for ( auto it = disk - > iterateDirectory ( path ) ; it - > isValid ( ) ; it - > next ( ) )
2021-11-18 20:17:22 +00:00
{
2023-05-25 09:10:45 +00:00
if ( disk - > isFile ( it - > path ( ) ) )
size + = disk - > getFileSize ( it - > path ( ) ) ;
2021-11-18 20:17:22 +00:00
else
2023-05-25 09:10:45 +00:00
size + = getTotalSize ( disk , it - > path ( ) ) ;
2021-11-18 20:17:22 +00:00
}
2023-05-25 09:10:45 +00:00
2021-11-18 20:17:22 +00:00
return size ;
2021-10-27 12:26:42 +00:00
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperDispatcher : : getLogDirSize ( ) const
2021-11-05 10:21:34 +00:00
{
2023-05-25 09:10:45 +00:00
auto log_disk = keeper_context - > getLogDisk ( ) ;
auto size = getTotalSize ( log_disk ) ;
2023-05-26 14:31:09 +00:00
auto latest_log_disk = keeper_context - > getLatestLogDisk ( ) ;
if ( log_disk ! = latest_log_disk )
size + = getTotalSize ( latest_log_disk ) ;
2023-05-25 09:10:45 +00:00
return size ;
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperDispatcher : : getSnapDirSize ( ) const
2021-11-05 10:21:34 +00:00
{
2023-05-25 09:10:45 +00:00
return getTotalSize ( keeper_context - > getSnapshotDisk ( ) ) ;
2021-11-05 10:21:34 +00:00
}
2021-11-19 09:30:58 +00:00
Keeper4LWInfo KeeperDispatcher : : getKeeper4LWInfo ( ) const
2021-10-27 12:26:42 +00:00
{
2022-07-01 13:57:24 +00:00
Keeper4LWInfo result = server - > getPartiallyFilled4LWInfo ( ) ;
2023-07-03 07:23:13 +00:00
result . outstanding_requests_count = requests_queue - > size ( ) ;
2021-11-18 20:17:22 +00:00
{
std : : lock_guard lock ( session_to_response_callback_mutex ) ;
result . alive_connections_count = session_to_response_callback . size ( ) ;
}
return result ;
2021-11-05 10:21:34 +00:00
}
2023-03-05 16:45:17 +00:00
void KeeperDispatcher : : cleanResources ( )
{
# if USE_JEMALLOC
LOG_TRACE ( & Poco : : Logger : : get ( " KeeperDispatcher " ) , " Purging unused memory " ) ;
Stopwatch watch ;
mallctl ( " arena. " STRINGIFY ( MALLCTL_ARENAS_ALL ) " .purge " , nullptr , nullptr , nullptr , 0 ) ;
ProfileEvents : : increment ( ProfileEvents : : MemoryAllocatorPurge ) ;
ProfileEvents : : increment ( ProfileEvents : : MemoryAllocatorPurgeTimeMicroseconds , watch . elapsedMicroseconds ( ) ) ;
# endif
}
2021-01-19 14:22:28 +00:00
}