Add setting for quick batch size

This commit is contained in:
Antonio Andelic 2022-11-26 17:33:40 +00:00
parent 7437c719a7
commit cd471e9c61
4 changed files with 28 additions and 7 deletions

View File

@ -138,11 +138,27 @@ public:
}
/// Returns false if queue is (finished and empty) or (object was not popped during timeout)
[[nodiscard]] bool tryPop(T & x, UInt64 milliseconds = 0)
[[nodiscard]] bool tryPop(T & x, UInt64 milliseconds)
{
return popImpl(x, milliseconds);
}
[[nodiscard]] bool tryPop(T & x)
{
{
std::unique_lock<std::mutex> queue_lock(queue_mutex);
if (queue.empty())
return false;
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
push_condition.notify_one();
return true;
}
/// Returns size of queue
size_t size() const
{

View File

@ -530,7 +530,6 @@ void ZooKeeperMultiRequest::writeImpl(WriteBuffer & out) const
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
{
while (true)
{
OpNum op_num;

View File

@ -37,7 +37,9 @@ struct Settings;
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_requests_batch_size, 100000, "Max size of batch in requests count before it will be sent to RAFT", 0) \
M(UInt64, max_request_queue_size, 100000, "Maximum number of request that can be in queue for processing", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_requests_quick_batch_size, 10, "Maz size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \

View File

@ -84,13 +84,18 @@ void KeeperDispatcher::requestThread()
current_batch.emplace_back(request);
bool quick_batch_done = false;
size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size;
/// Waiting until previous append will be successful, or batch is big enough
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
while ((!quick_batch_done && current_batch.size() < 10) || (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size))
/// Regardless of the status of previous request, if we have enough requests in queue, we will try
/// to batch at least max_quick_batch_size of them.
while ((!quick_batch_done && current_batch.size() < max_quick_batch_size)
|| (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK)
&& current_batch.size() <= max_batch_size))
{
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
if (requests_queue->tryPop(request))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
/// Don't append read request into batch, we have to process them separately
@ -101,7 +106,6 @@ void KeeperDispatcher::requestThread()
}
else
{
current_batch.emplace_back(request);
}
}
@ -295,7 +299,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
LOG_DEBUG(log, "Initializing storage dispatcher");
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_request_queue_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });