proper reconfig batch handling

This commit is contained in:
Mike Kot 2023-07-06 17:12:24 +00:00
parent bafcc3afdc
commit 5302b478a4
2 changed files with 20 additions and 13 deletions

View File

@ -110,7 +110,6 @@ const char * errorMessage(Error code)
case Error::ZCLOSING: return "ZooKeeper is closing"; case Error::ZCLOSING: return "ZooKeeper is closing";
case Error::ZNOTHING: return "(not error) no server responses to process"; case Error::ZNOTHING: return "(not error) no server responses to process";
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored"; case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
case Error::ZRECONFIGINPROGRESS: return "Another reconfiguration is progress";
} }
UNREACHABLE(); UNREACHABLE();

View File

@ -82,6 +82,7 @@ void KeeperDispatcher::requestThread()
/// requests into a batch we must check that the new request is not read request. Otherwise we have to /// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process /// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes. /// read request. So reads are some kind of "separator" for writes.
/// Also there is a special reconfig request also being a separator.
try try
{ {
if (requests_queue->tryPop(request, max_wait)) if (requests_queue->tryPop(request, max_wait))
@ -90,20 +91,17 @@ void KeeperDispatcher::requestThread()
if (shutdown_called) if (shutdown_called)
break; break;
if (request.request->getOpNum() == Coordination::OpNum::Reconfig)
{
server->getKeeperStateMachine()->reconfigure(request);
continue;
}
KeeperStorage::RequestsForSessions current_batch; KeeperStorage::RequestsForSessions current_batch;
size_t current_batch_bytes_size = 0; size_t current_batch_bytes_size = 0;
bool has_read_request = false; bool has_read_request = false;
bool has_reconfig_request = false;
/// If new request is not read request or we must to process it through quorum. /// If new request is not read request or reconfig request we must process it through quorum.
/// Otherwise we will process it locally. /// Otherwise we will process it locally.
if (coordination_settings->quorum_reads || !request.request->isReadRequest()) if (request.request->getOpNum() == Coordination::OpNum::Reconfig)
has_reconfig_request = true;
else if (coordination_settings->quorum_reads || !request.request->isReadRequest())
{ {
current_batch_bytes_size += request.request->bytesSize(); current_batch_bytes_size += request.request->bytesSize();
current_batch.emplace_back(request); current_batch.emplace_back(request);
@ -122,7 +120,10 @@ void KeeperDispatcher::requestThread()
read_request_queue[last_request.session_id][last_request.request->xid].push_back(request); read_request_queue[last_request.session_id][last_request.request->xid].push_back(request);
} }
else if (request.request->getOpNum() == Coordination::OpNum::Reconfig) else if (request.request->getOpNum() == Coordination::OpNum::Reconfig)
server->getKeeperStateMachine()->reconfigure(request); {
has_reconfig_request = true;
return false;
}
else else
{ {
current_batch_bytes_size += request.request->bytesSize(); current_batch_bytes_size += request.request->bytesSize();
@ -138,6 +139,7 @@ void KeeperDispatcher::requestThread()
/// TODO: Deprecate max_requests_quick_batch_size and use only max_requests_batch_size and max_requests_batch_bytes_size /// TODO: Deprecate max_requests_quick_batch_size and use only max_requests_batch_size and max_requests_batch_bytes_size
size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size; size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size;
while (!shutdown_called && !has_read_request && while (!shutdown_called && !has_read_request &&
!has_reconfig_request &&
current_batch.size() < max_quick_batch_size && current_batch_bytes_size < max_batch_bytes_size && current_batch.size() < max_quick_batch_size && current_batch_bytes_size < max_batch_bytes_size &&
try_get_request()) try_get_request())
; ;
@ -150,8 +152,10 @@ void KeeperDispatcher::requestThread()
}; };
/// Waiting until previous append will be successful, or batch is big enough /// Waiting until previous append will be successful, or batch is big enough
while (!shutdown_called && !has_read_request && !prev_result_done() && while (!shutdown_called && !has_read_request &&
current_batch.size() <= max_batch_size && current_batch_bytes_size < max_batch_bytes_size) !has_reconfig_request && !prev_result_done() &&
current_batch.size() <= max_batch_size
&& current_batch_bytes_size < max_batch_bytes_size)
{ {
try_get_request(); try_get_request();
} }
@ -175,7 +179,8 @@ void KeeperDispatcher::requestThread()
if (result) if (result)
{ {
if (has_read_request) /// If we will execute read request next, than we have to process result now /// If we will execute read or reconfig next, we have to process result now
if (has_read_request || has_reconfig_request)
forceWaitAndProcessResult(result, current_batch); forceWaitAndProcessResult(result, current_batch);
} }
else else
@ -189,6 +194,9 @@ void KeeperDispatcher::requestThread()
prev_result = result; prev_result = result;
} }
if (has_reconfig_request)
server->getKeeperStateMachine()->reconfigure(request);
/// Read request always goes after write batch (last request) /// Read request always goes after write batch (last request)
if (has_read_request) if (has_read_request)
{ {