From 58120b3f47ea38df69f667603e8a691e25bc4fab Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 16 Oct 2023 08:17:23 +0000 Subject: [PATCH] Polish --- src/Coordination/KeeperContext.h | 2 +- src/Coordination/KeeperDispatcher.cpp | 4 +- src/Coordination/KeeperServer.cpp | 126 +++++++++++------------- src/Coordination/KeeperServer.h | 2 +- src/Coordination/KeeperStateMachine.cpp | 6 +- 5 files changed, 63 insertions(+), 77 deletions(-) diff --git a/src/Coordination/KeeperContext.h b/src/Coordination/KeeperContext.h index 648af2c534b..9353a8a5872 100644 --- a/src/Coordination/KeeperContext.h +++ b/src/Coordination/KeeperContext.h @@ -54,7 +54,7 @@ public: constexpr KeeperDispatcher * getDispatcher() const { return dispatcher; } - std::atomic initial_batch_committed = false; + std::atomic local_logs_preprocessed = false; std::atomic shutdown_called = false; private: /// local disk defined using path or disk name diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index d16c7a3962b..d2ae699c6d5 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -460,8 +460,8 @@ void KeeperDispatcher::shutdown() our_last_committed_log_idx = std::numeric_limits::max(); our_last_committed_log_idx.notify_all(); - keeper_context->initial_batch_committed = true; - keeper_context->initial_batch_committed.notify_all(); + keeper_context->local_logs_preprocessed = true; + keeper_context->local_logs_preprocessed.notify_all(); if (session_cleaner_thread.joinable()) session_cleaner_thread.join(); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 947e208dddb..11026ca11b8 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -405,26 +405,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo auto log_store = state_manager->load_log_store(); last_log_idx_on_disk = log_store->next_slot() - 1; - if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index()) - { - auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1); - - //size_t preprocessed = 0; - LOG_INFO(log, "Preprocessing {} log entries", log_entries->size()); - //auto idx = state_machine->last_commit_index() + 1; - //for (const auto & entry : *log_entries) - //{ - // if (entry && entry->get_val_type() == nuraft::log_val_type::app_log) - // state_machine->pre_commit(idx, entry->get_buf()); - - // ++idx; - // ++preprocessed; - - // if (preprocessed % 50000 == 0) - // LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size()); - //} - //LOG_INFO(log, "Preprocessing done"); - } + LOG_TRACE(log, "Last local log idx {}", last_log_idx_on_disk); loadLatestConfig(); @@ -625,39 +606,68 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ } } - if (!keeper_context->initial_batch_committed) + if (!keeper_context->local_logs_preprocessed) { + const auto preprocess_logs = [&] + { + auto log_store = state_manager->load_log_store(); + if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index()) + { + auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1); + + size_t preprocessed = 0; + LOG_INFO(log, "Preprocessing {} log entries", log_entries->size()); + auto idx = state_machine->last_commit_index() + 1; + for (const auto & entry : *log_entries) + { + if (entry && entry->get_val_type() == nuraft::log_val_type::app_log) + state_machine->pre_commit(idx, entry->get_buf()); + + ++idx; + ++preprocessed; + + if (preprocessed % 50000 == 0) + LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size()); + } + LOG_INFO(log, "Preprocessing done"); + } + else + { + LOG_INFO(log, "All local log entries preprocessed"); + } + keeper_context->local_logs_preprocessed = true; + keeper_context->local_logs_preprocessed.notify_all(); + }; + switch (type) { case nuraft::cb_func::InitialBatchCommited: { - auto log_store = state_manager->load_log_store(); - if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index()) + preprocess_logs(); + break; + } + case nuraft::cb_func::GotAppendEntryReqFromLeader: + { + auto & req = *static_cast(param->ctx); + if (req.get_commit_idx() != 0) { - auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1); - - size_t preprocessed = 0; - LOG_INFO(log, "Preprocessing {} log entries", log_entries->size()); - auto idx = state_machine->last_commit_index() + 1; - for (const auto & entry : *log_entries) + auto last_committed_index = state_machine->last_commit_index(); + if (!req.log_entries().empty()) { - if (entry && entry->get_val_type() == nuraft::log_val_type::app_log) - state_machine->pre_commit(idx, entry->get_buf()); + // Actual log number. + auto index_to_commit = std::min({last_log_idx_on_disk, req.get_last_log_idx(), req.get_commit_idx()}); - ++idx; - ++preprocessed; - - if (preprocessed % 50000 == 0) - LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size()); + if (index_to_commit > last_committed_index) + { + LOG_TRACE(log, "Trying to commit local log entries, comitting upto {}", index_to_commit); + raft_instance->commitLogs(index_to_commit); + } + else if (index_to_commit == 0) /// we need to rollback all the logs so we preprocess all of them + { + preprocess_logs(); + } } - LOG_INFO(log, "Preprocessing done"); } - else - { - LOG_INFO(log, "Nothing extra to preprocess"); - } - keeper_context->initial_batch_committed = true; - keeper_context->initial_batch_committed.notify_all(); break; } default: @@ -788,34 +798,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ case nuraft::cb_func::BecomeLeader: { /// We become leader and store is empty or we already committed it - if (commited_store || keeper_context->initial_batch_committed) + if (commited_store || initial_batch_committed) set_initialized(); return nuraft::cb_func::ReturnCode::Ok; } - case nuraft::cb_func::GotAppendEntryReqFromLeader: + case nuraft::cb_func::GotAppendEntryReqFromLeader: { - if (!on_disk_logs_preprocessed) - { - auto & req = *static_cast(param->ctx); - if (req.get_commit_idx() != 0) - { - auto last_committed_index = state_machine->last_commit_index(); - ulong index_to_commit = last_log_idx_on_disk; - if (!req.log_entries().empty()) - { - // Actual log number. - ulong last_log_idx = req.get_last_log_idx() + 1; - index_to_commit = std::min({index_to_commit, last_log_idx, req.get_commit_idx()}); - LOG_INFO(log, "Going to commit {}, last committed index {}", index_to_commit, last_committed_index); - - if (index_to_commit > last_committed_index) - raft_instance->commitLogs(index_to_commit); - } - - on_disk_logs_preprocessed = true; - } - } - if (param->leaderId != -1) { auto leader_index = raft_instance->get_leader_committed_log_idx(); @@ -852,10 +840,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ case nuraft::cb_func::InitialBatchCommited: { if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests. - { - on_disk_logs_preprocessed = true; set_initialized(); - } + initial_batch_committed = true; return nuraft::cb_func::ReturnCode::Ok; } case nuraft::cb_func::PreAppendLogFollower: diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 9e32a508e71..ed58418fe5f 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -42,9 +42,9 @@ private: std::mutex initialized_mutex; std::atomic initialized_flag = false; std::condition_variable initialized_cv; + std::atomic initial_batch_committed = false; uint64_t last_log_idx_on_disk = 0; - bool on_disk_logs_preprocessed = false; nuraft::ptr last_local_config; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 43af721da2e..94c0f41ada2 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -162,7 +162,7 @@ void assertDigest( nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) { - keeper_context->initial_batch_committed.wait(false); + keeper_context->local_logs_preprocessed.wait(false); if (keeper_context->shutdown_called) return nullptr; @@ -393,7 +393,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n request_for_session->log_idx = log_idx; - if (!keeper_context->initial_batch_committed) + if (!keeper_context->local_logs_preprocessed) preprocess(*request_for_session); auto try_push = [this](const KeeperStorage::ResponseForSession& response) @@ -510,7 +510,7 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptrinitial_batch_committed.wait(false); + keeper_context->local_logs_preprocessed.wait(false); if (keeper_context->shutdown_called) return;