This commit is contained in:
Antonio Andelic 2023-10-16 08:17:23 +00:00
parent ae2b64b59e
commit 58120b3f47
5 changed files with 63 additions and 77 deletions

View File

@ -54,7 +54,7 @@ public:
constexpr KeeperDispatcher * getDispatcher() const { return dispatcher; }
std::atomic<bool> initial_batch_committed = false;
std::atomic<bool> local_logs_preprocessed = false;
std::atomic<bool> shutdown_called = false;
private:
/// local disk defined using path or disk name

View File

@ -460,8 +460,8 @@ void KeeperDispatcher::shutdown()
our_last_committed_log_idx = std::numeric_limits<uint64_t>::max();
our_last_committed_log_idx.notify_all();
keeper_context->initial_batch_committed = true;
keeper_context->initial_batch_committed.notify_all();
keeper_context->local_logs_preprocessed = true;
keeper_context->local_logs_preprocessed.notify_all();
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();

View File

@ -405,26 +405,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
auto log_store = state_manager->load_log_store();
last_log_idx_on_disk = log_store->next_slot() - 1;
if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index())
{
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1);
//size_t preprocessed = 0;
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
//auto idx = state_machine->last_commit_index() + 1;
//for (const auto & entry : *log_entries)
//{
// if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
// state_machine->pre_commit(idx, entry->get_buf());
// ++idx;
// ++preprocessed;
// if (preprocessed % 50000 == 0)
// LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
//}
//LOG_INFO(log, "Preprocessing done");
}
LOG_TRACE(log, "Last local log idx {}", last_log_idx_on_disk);
loadLatestConfig();
@ -625,39 +606,68 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
}
if (!keeper_context->initial_batch_committed)
if (!keeper_context->local_logs_preprocessed)
{
const auto preprocess_logs = [&]
{
auto log_store = state_manager->load_log_store();
if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index())
{
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1);
size_t preprocessed = 0;
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
auto idx = state_machine->last_commit_index() + 1;
for (const auto & entry : *log_entries)
{
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
state_machine->pre_commit(idx, entry->get_buf());
++idx;
++preprocessed;
if (preprocessed % 50000 == 0)
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
}
LOG_INFO(log, "Preprocessing done");
}
else
{
LOG_INFO(log, "All local log entries preprocessed");
}
keeper_context->local_logs_preprocessed = true;
keeper_context->local_logs_preprocessed.notify_all();
};
switch (type)
{
case nuraft::cb_func::InitialBatchCommited:
{
auto log_store = state_manager->load_log_store();
if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index())
preprocess_logs();
break;
}
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
auto & req = *static_cast<nuraft::req_msg *>(param->ctx);
if (req.get_commit_idx() != 0)
{
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1);
size_t preprocessed = 0;
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
auto idx = state_machine->last_commit_index() + 1;
for (const auto & entry : *log_entries)
auto last_committed_index = state_machine->last_commit_index();
if (!req.log_entries().empty())
{
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
state_machine->pre_commit(idx, entry->get_buf());
// Actual log number.
auto index_to_commit = std::min({last_log_idx_on_disk, req.get_last_log_idx(), req.get_commit_idx()});
++idx;
++preprocessed;
if (preprocessed % 50000 == 0)
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
if (index_to_commit > last_committed_index)
{
LOG_TRACE(log, "Trying to commit local log entries, comitting upto {}", index_to_commit);
raft_instance->commitLogs(index_to_commit);
}
else if (index_to_commit == 0) /// we need to rollback all the logs so we preprocess all of them
{
preprocess_logs();
}
}
LOG_INFO(log, "Preprocessing done");
}
else
{
LOG_INFO(log, "Nothing extra to preprocess");
}
keeper_context->initial_batch_committed = true;
keeper_context->initial_batch_committed.notify_all();
break;
}
default:
@ -788,34 +798,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
case nuraft::cb_func::BecomeLeader:
{
/// We become leader and store is empty or we already committed it
if (commited_store || keeper_context->initial_batch_committed)
if (commited_store || initial_batch_committed)
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::GotAppendEntryReqFromLeader:
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
if (!on_disk_logs_preprocessed)
{
auto & req = *static_cast<nuraft::req_msg *>(param->ctx);
if (req.get_commit_idx() != 0)
{
auto last_committed_index = state_machine->last_commit_index();
ulong index_to_commit = last_log_idx_on_disk;
if (!req.log_entries().empty())
{
// Actual log number.
ulong last_log_idx = req.get_last_log_idx() + 1;
index_to_commit = std::min({index_to_commit, last_log_idx, req.get_commit_idx()});
LOG_INFO(log, "Going to commit {}, last committed index {}", index_to_commit, last_committed_index);
if (index_to_commit > last_committed_index)
raft_instance->commitLogs(index_to_commit);
}
on_disk_logs_preprocessed = true;
}
}
if (param->leaderId != -1)
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
@ -852,10 +840,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
case nuraft::cb_func::InitialBatchCommited:
{
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
{
on_disk_logs_preprocessed = true;
set_initialized();
}
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:

View File

@ -42,9 +42,9 @@ private:
std::mutex initialized_mutex;
std::atomic<bool> initialized_flag = false;
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
uint64_t last_log_idx_on_disk = 0;
bool on_disk_logs_preprocessed = false;
nuraft::ptr<nuraft::cluster_config> last_local_config;

View File

@ -162,7 +162,7 @@ void assertDigest(
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
{
keeper_context->initial_batch_committed.wait(false);
keeper_context->local_logs_preprocessed.wait(false);
if (keeper_context->shutdown_called)
return nullptr;
@ -393,7 +393,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
request_for_session->log_idx = log_idx;
if (!keeper_context->initial_batch_committed)
if (!keeper_context->local_logs_preprocessed)
preprocess(*request_for_session);
auto try_push = [this](const KeeperStorage::ResponseForSession& response)
@ -510,7 +510,7 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraf
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
{
keeper_context->initial_batch_committed.wait(false);
keeper_context->local_logs_preprocessed.wait(false);
if (keeper_context->shutdown_called)
return;