Merge pull request #58455 from ClickHouse/keeper-even-better-startup

Avoid huge memory consumption during Keeper startup for more cases
This commit is contained in:
Antonio Andelic 2024-01-08 10:46:17 +01:00 committed by GitHub
commit c7b9f4aaed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,6 +4,7 @@
#include "config.h"
#include <chrono>
#include <mutex>
#include <string>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
@ -14,6 +15,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
#include <libnuraft/callback.hxx>
#include <libnuraft/cluster_config.hxx>
#include <libnuraft/log_val_type.hxx>
#include <libnuraft/msg_type.hxx>
@ -196,13 +198,9 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
nuraft::raft_server::commit_in_bg();
}
void commitLogs(uint64_t index_to_commit, bool initial_commit_exec)
std::unique_lock<std::recursive_mutex> lockRaft()
{
leader_commit_index_.store(index_to_commit);
quick_commit_index_ = index_to_commit;
lagging_sm_target_index_ = index_to_commit;
commit_in_bg_exec(0, initial_commit_exec);
return std::unique_lock(lock_);
}
using nuraft::raft_server::raft_server;
@ -518,6 +516,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession &
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
entries.reserve(requests_for_sessions.size());
for (const auto & request_for_session : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(request_for_session));
@ -630,32 +629,32 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
{
const auto preprocess_logs = [&]
{
auto lock = raft_instance->lockRaft();
keeper_context->local_logs_preprocessed = true;
auto log_store = state_manager->load_log_store();
if (last_log_idx_on_disk > 0 && last_log_idx_on_disk > state_machine->last_commit_index())
{
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, last_log_idx_on_disk + 1);
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, log_store->next_slot());
size_t preprocessed = 0;
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
auto idx = state_machine->last_commit_index() + 1;
for (const auto & entry : *log_entries)
{
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
state_machine->pre_commit(idx, entry->get_buf());
++idx;
++preprocessed;
if (preprocessed % 50000 == 0)
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
}
LOG_INFO(log, "Preprocessing done");
}
else
if (log_entries->empty())
{
LOG_INFO(log, "All local log entries preprocessed");
return;
}
size_t preprocessed = 0;
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
auto idx = state_machine->last_commit_index() + 1;
for (const auto & entry : *log_entries)
{
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
state_machine->pre_commit(idx, entry->get_buf());
++idx;
++preprocessed;
if (preprocessed % 50000 == 0)
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
}
LOG_INFO(log, "Preprocessing done");
};
switch (type)
@ -666,43 +665,34 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
/// until we preprocess all stored logs
return nuraft::cb_func::ReturnCode::ReturnNull;
}
case nuraft::cb_func::InitialBatchCommited:
{
preprocess_logs();
break;
}
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
auto & req = *static_cast<nuraft::req_msg *>(param->ctx);
if (req.get_commit_idx() == 0 || req.log_entries().empty())
break;
auto last_committed_index = state_machine->last_commit_index();
// Actual log number.
auto index_to_commit = std::min({last_log_idx_on_disk, req.get_last_log_idx(), req.get_commit_idx()});
if (index_to_commit > last_committed_index)
{
LOG_TRACE(log, "Trying to commit local log entries, committing upto {}", index_to_commit);
raft_instance->commitLogs(index_to_commit, true);
/// after we manually committed all the local logs we can, we assert that all of the local logs are either
/// committed or preprocessed
if (!keeper_context->local_logs_preprocessed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Local logs are not preprocessed");
}
else if (last_log_idx_on_disk <= last_committed_index)
{
keeper_context->local_logs_preprocessed = true;
}
else if
(
index_to_commit == 0 ||
(index_to_commit == last_committed_index && last_log_idx_on_disk > index_to_commit) /// we need to rollback all the logs so we preprocess all of them
)
/// maybe we got snapshot installed
if (state_machine->last_commit_index() >= last_log_idx_on_disk)
{
preprocess_logs();
break;
}
auto & req = *static_cast<nuraft::req_msg *>(param->ctx);
if (req.log_entries().empty())
break;
if (req.get_last_log_idx() < last_log_idx_on_disk)
last_log_idx_on_disk = req.get_last_log_idx();
/// we don't want to accept too many new logs before we preprocess all the local logs
/// because the next log index is decreased on each failure we need to also accept requests when it's near last_log_idx_on_disk
/// so the counter is reset on the leader side
else if (raft_instance->get_target_committed_log_idx() >= last_log_idx_on_disk && req.get_last_log_idx() > last_log_idx_on_disk)
return nuraft::cb_func::ReturnNull;
break;
}
case nuraft::cb_func::StateMachineExecution:
{
if (state_machine->last_commit_index() >= last_log_idx_on_disk)
preprocess_logs();
break;
}
default: