From c944e2e8170b6cdd6519dc3f87da0a2dd60f3ac7 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 29 Feb 2024 15:47:57 +0100 Subject: [PATCH] Fix --- src/Coordination/KeeperContext.cpp | 4 +++- src/Coordination/KeeperDispatcher.cpp | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index 7c1ff55245e..a36a074ce89 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -1,4 +1,6 @@ +#include #include + #include #include @@ -442,7 +444,7 @@ bool KeeperContext::waitCommittedUpto(uint64_t log_idx, uint64_t wait_timeout_ms bool success = last_committed_log_idx_cv.wait_for( lock, std::chrono::milliseconds(wait_timeout_ms), - [&] { return shutdown_called || last_committed_log_idx >= wait_commit_upto_idx; }); + [&] { return shutdown_called || lastCommittedIndex() >= wait_commit_upto_idx; }); wait_commit_upto_idx.reset(); return success; diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 7af9c65e9d3..f6598cfaa17 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -218,7 +218,7 @@ void KeeperDispatcher::requestThread() /// Forcefully process all previous pending requests if (prev_result) result_buf - = forceWaitAndProcessResult(prev_result, prev_batch, /*clear_requests_on_success=*/true); + = forceWaitAndProcessResult(prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write); /// Process collected write requests batch if (!current_batch.empty()) @@ -243,7 +243,7 @@ void KeeperDispatcher::requestThread() { if (prev_result) result_buf = forceWaitAndProcessResult( - prev_result, current_batch, /*clear_requests_on_success=*/!execute_requests_after_write); + prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write); /// In case of older version or disabled async replication, result buf will be set to value of `commit` function /// which always returns nullptr @@ -257,11 +257,13 @@ void KeeperDispatcher::requestThread() /// if timeout happened set error responses for the requests if (!keeper_context->waitCommittedUpto(log_idx, coordination_settings->operation_timeout_ms.totalMilliseconds())) - addErrorResponses(current_batch, Coordination::Error::ZOPERATIONTIMEOUT); + addErrorResponses(prev_batch, Coordination::Error::ZOPERATIONTIMEOUT); if (shutdown_called) return; } + + prev_batch.clear(); } if (has_reconfig_request)