Limit keeper request batching by size in bytes

This commit is contained in:
Alexander Gololobov 2023-03-23 00:32:58 +01:00
parent a31c0d8eee
commit afe3af230f
4 changed files with 21 additions and 5 deletions

View File

@ -140,6 +140,8 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const
writeText("max_requests_batch_size=", buf);
write_int(coordination_settings->max_requests_batch_size);
writeText("max_requests_batch_bytes_size=", buf);
write_int(coordination_settings->max_requests_batch_bytes_size);
writeText("max_request_queue_size=", buf);
write_int(coordination_settings->max_request_queue_size);
writeText("max_requests_quick_batch_size=", buf);

View File

@ -39,7 +39,8 @@ struct Settings;
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_request_queue_size, 100000, "Maximum number of request that can be in queue for processing", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_quick_batch_size, 10, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(UInt64, max_requests_batch_bytes_size, 100*1024, "Max size in bytes of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \

View File

@ -73,6 +73,7 @@ void KeeperDispatcher::requestThread()
auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size;
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
@ -89,6 +90,7 @@ void KeeperDispatcher::requestThread()
break;
KeeperStorage::RequestsForSessions current_batch;
size_t current_batch_bytes_size = 0;
bool has_read_request = false;
@ -96,6 +98,7 @@ void KeeperDispatcher::requestThread()
/// Otherwise we will process it locally.
if (coordination_settings->quorum_reads || !request.request->isReadRequest())
{
current_batch_bytes_size += request.request->bytesSize();
current_batch.emplace_back(request);
const auto try_get_request = [&]
@ -108,7 +111,10 @@ void KeeperDispatcher::requestThread()
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
has_read_request = true;
else
{
current_batch_bytes_size += request.request->bytesSize();
current_batch.emplace_back(request);
}
return true;
}
@ -116,9 +122,11 @@ void KeeperDispatcher::requestThread()
return false;
};
/// If we have enough requests in queue, we will try to batch at least max_quick_batch_size of them.
/// TODO: Deprecate max_requests_quick_batch_size and use only max_requests_batch_size and max_requests_batch_bytes_size
size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size;
while (!shutdown_called && !has_read_request && current_batch.size() < max_quick_batch_size && try_get_request())
while (!shutdown_called && !has_read_request &&
current_batch.size() < max_quick_batch_size && current_batch_bytes_size < max_batch_bytes_size &&
try_get_request())
;
const auto prev_result_done = [&]
@ -129,7 +137,8 @@ void KeeperDispatcher::requestThread()
};
/// Waiting until previous append will be successful, or batch is big enough
while (!shutdown_called && !has_read_request && !prev_result_done() && current_batch.size() <= max_batch_size)
while (!shutdown_called && !has_read_request && !prev_result_done() &&
current_batch.size() <= max_batch_size && current_batch_bytes_size < max_batch_bytes_size)
{
try_get_request();
}
@ -147,6 +156,8 @@ void KeeperDispatcher::requestThread()
/// Process collected write requests batch
if (!current_batch.empty())
{
LOG_TRACE(log, "Processing requests batch, size: {}, bytes: {}", current_batch.size(), current_batch_bytes_size);
auto result = server->putRequestBatch(current_batch);
if (result)
@ -158,6 +169,7 @@ void KeeperDispatcher::requestThread()
{
addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS);
current_batch.clear();
current_batch_bytes_size = 0;
}
prev_batch = std::move(current_batch);

View File

@ -285,8 +285,9 @@ def test_cmd_conf(started_cluster):
assert result["fresh_log_gap"] == "200"
assert result["max_requests_batch_size"] == "100"
assert result["max_requests_batch_bytes_size"] == "102400"
assert result["max_request_queue_size"] == "100000"
assert result["max_requests_quick_batch_size"] == "10"
assert result["max_requests_quick_batch_size"] == "100"
assert result["quorum_reads"] == "false"
assert result["force_sync"] == "true"