This commit is contained in:
Antonio Andelic 2024-02-29 15:47:57 +01:00
parent 4dc0ce03b0
commit c944e2e817
2 changed files with 8 additions and 4 deletions

View File

@ -1,4 +1,6 @@
#include <atomic>
#include <chrono>
#include <Coordination/KeeperContext.h>
#include <Coordination/Defines.h>
@ -442,7 +444,7 @@ bool KeeperContext::waitCommittedUpto(uint64_t log_idx, uint64_t wait_timeout_ms
bool success = last_committed_log_idx_cv.wait_for(
lock,
std::chrono::milliseconds(wait_timeout_ms),
[&] { return shutdown_called || last_committed_log_idx >= wait_commit_upto_idx; });
[&] { return shutdown_called || lastCommittedIndex() >= wait_commit_upto_idx; });
wait_commit_upto_idx.reset();
return success;

View File

@ -218,7 +218,7 @@ void KeeperDispatcher::requestThread()
/// Forcefully process all previous pending requests
if (prev_result)
result_buf
= forceWaitAndProcessResult(prev_result, prev_batch, /*clear_requests_on_success=*/true);
= forceWaitAndProcessResult(prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write);
/// Process collected write requests batch
if (!current_batch.empty())
@ -243,7 +243,7 @@ void KeeperDispatcher::requestThread()
{
if (prev_result)
result_buf = forceWaitAndProcessResult(
prev_result, current_batch, /*clear_requests_on_success=*/!execute_requests_after_write);
prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write);
/// In case of older version or disabled async replication, result buf will be set to value of `commit` function
/// which always returns nullptr
@ -257,11 +257,13 @@ void KeeperDispatcher::requestThread()
/// if timeout happened set error responses for the requests
if (!keeper_context->waitCommittedUpto(log_idx, coordination_settings->operation_timeout_ms.totalMilliseconds()))
addErrorResponses(current_batch, Coordination::Error::ZOPERATIONTIMEOUT);
addErrorResponses(prev_batch, Coordination::Error::ZOPERATIONTIMEOUT);
if (shutdown_called)
return;
}
prev_batch.clear();
}
if (has_reconfig_request)