ClickHouse/src/Coordination/KeeperServer.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

969 lines
36 KiB
C++
Raw Normal View History

#include <Coordination/Defines.h>
2022-04-14 12:07:33 +00:00
#include <Coordination/KeeperServer.h>
2021-04-12 13:11:43 +00:00
#include "config.h"
2021-04-12 13:11:43 +00:00
2022-04-14 12:07:33 +00:00
#include <chrono>
#include <filesystem>
#include <string>
2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
2022-04-14 12:07:33 +00:00
#include <Coordination/LoggerWrapper.h>
2021-01-22 16:04:57 +00:00
#include <Coordination/ReadBufferFromNuraftBuffer.h>
2022-04-14 12:07:33 +00:00
#include <Coordination/WriteBufferFromNuraftBuffer.h>
2021-01-22 16:04:57 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
2021-05-23 17:54:42 +00:00
#include <boost/algorithm/string.hpp>
2022-04-19 08:08:13 +00:00
#include <libnuraft/cluster_config.hxx>
2022-05-06 12:25:25 +00:00
#include <libnuraft/log_val_type.hxx>
2022-05-12 08:58:36 +00:00
#include <libnuraft/ptr.hxx>
2022-04-12 14:08:32 +00:00
#include <libnuraft/raft_server.hxx>
2022-04-14 12:07:33 +00:00
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
2022-08-04 13:03:05 +00:00
#include <Common/LockMemoryExceptionInThread.h>
2022-04-14 12:07:33 +00:00
#include <Common/ZooKeeper/ZooKeeperIO.h>
2022-07-20 20:30:16 +00:00
#include <Common/Stopwatch.h>
2022-08-08 09:07:54 +00:00
#include <Common/getMultipleKeysFromConfig.h>
2021-01-21 20:01:25 +00:00
namespace DB
{
2021-01-25 12:29:12 +00:00
namespace ErrorCodes
{
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
2021-04-16 18:35:03 +00:00
extern const int LOGICAL_ERROR;
2021-05-23 17:54:42 +00:00
extern const int INVALID_CONFIG_PARAMETER;
2021-01-25 12:29:12 +00:00
}
2021-01-22 16:04:57 +00:00
2021-04-12 13:11:43 +00:00
namespace
{
#if USE_SSL
void setSSLParams(nuraft::asio_service::options & asio_opts)
{
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
String certificate_file_property = "openSSL.server.certificateFile";
String private_key_file_property = "openSSL.server.privateKeyFile";
String root_ca_file_property = "openSSL.server.caConfig";
if (!config.has(certificate_file_property))
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Server certificate file is not set.");
2021-04-12 13:11:43 +00:00
if (!config.has(private_key_file_property))
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Server private key file is not set.");
2021-04-12 13:11:43 +00:00
asio_opts.enable_ssl_ = true;
asio_opts.server_cert_file_ = config.getString(certificate_file_property);
asio_opts.server_key_file_ = config.getString(private_key_file_property);
if (config.has(root_ca_file_property))
asio_opts.root_cert_file_ = config.getString(root_ca_file_property);
if (config.getBool("openSSL.server.loadDefaultCAFile", false))
asio_opts.load_default_ca_file_ = true;
if (config.getString("openSSL.server.verificationMode", "none") == "none")
asio_opts.skip_verification_ = true;
}
#endif
2021-10-27 12:26:42 +00:00
std::string checkAndGetSuperdigest(const String & user_and_digest)
2021-05-23 17:54:42 +00:00
{
2021-10-27 12:26:42 +00:00
if (user_and_digest.empty())
2021-05-23 17:54:42 +00:00
return "";
std::vector<std::string> scheme_and_id;
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
2022-04-14 12:07:33 +00:00
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
2021-05-23 17:54:42 +00:00
2021-05-24 12:18:04 +00:00
return user_and_digest;
2021-05-23 17:54:42 +00:00
}
2021-12-27 12:23:44 +00:00
int32_t getValueOrMaxInt32AndLogWarning(uint64_t value, const std::string & name, Poco::Logger * log)
{
if (value > std::numeric_limits<int32_t>::max())
{
2022-04-14 12:07:33 +00:00
LOG_WARNING(
log,
"Got {} value for setting '{}' which is bigger than int32_t max value, lowering value to {}.",
value,
name,
std::numeric_limits<int32_t>::max());
2021-12-27 12:23:44 +00:00
return std::numeric_limits<int32_t>::max();
}
return static_cast<int32_t>(value);
}
2021-04-12 13:11:43 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperServer::KeeperServer(
2021-11-18 20:17:22 +00:00
const KeeperConfigurationAndSettingsPtr & configuration_and_settings_,
2021-02-11 09:17:57 +00:00
const Poco::Util::AbstractConfiguration & config,
2021-03-05 10:40:24 +00:00
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
KeeperSnapshotManagerS3 & snapshot_manager_s3)
2021-11-18 20:17:22 +00:00
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
2021-04-07 10:18:07 +00:00
, log(&Poco::Logger::get("KeeperServer"))
2022-07-30 11:34:17 +00:00
, is_recovering(config.getBool("keeper_server.force_recovery", false))
2022-07-23 14:27:44 +00:00
, keeper_context{std::make_shared<KeeperContext>()}
2022-07-30 11:34:17 +00:00
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
2021-01-22 16:04:57 +00:00
{
if (coordination_settings->quorum_reads)
2021-04-07 10:18:07 +00:00
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
2022-07-23 14:27:44 +00:00
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
2022-07-25 12:38:48 +00:00
keeper_context->ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);
2022-07-23 14:27:44 +00:00
state_machine = nuraft::cs_new<KeeperStateMachine>(
responses_queue_,
snapshots_queue_,
configuration_and_settings_->snapshot_storage_path,
coordination_settings,
keeper_context,
2022-10-18 10:52:57 +00:00
config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr,
2022-07-23 14:27:44 +00:00
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
state_manager = nuraft::cs_new<KeeperStateManager>(
server_id,
"keeper_server",
configuration_and_settings_->log_storage_path,
configuration_and_settings_->state_file_path,
config,
2022-11-21 12:22:36 +00:00
coordination_settings);
2021-01-22 16:04:57 +00:00
}
2022-04-26 07:32:02 +00:00
/**
* Tiny wrapper around nuraft::raft_server which adds some functions
* necessary for recovery, mostly connected to config manipulation.
*/
2022-04-14 12:00:47 +00:00
struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
2021-01-22 16:04:57 +00:00
{
2022-04-14 12:00:47 +00:00
bool isClusterHealthy()
{
if (timer_from_init)
{
2022-04-14 12:07:33 +00:00
size_t expiry = get_current_params().heart_beat_interval_ * raft_server::raft_limits_.response_limit_;
2021-03-02 14:30:56 +00:00
2022-04-14 12:00:47 +00:00
if (timer_from_init->elapsedMilliseconds() < expiry)
return false;
timer_from_init.reset();
}
const size_t voting_members = get_num_voting_members();
const auto not_responding_peers = get_not_responding_peers();
const auto quorum_size = voting_members / 2 + 1;
const auto max_not_responding_peers = voting_members - quorum_size;
return not_responding_peers <= max_not_responding_peers;
}
2022-04-19 08:08:13 +00:00
// Manually set the internal config of the raft server
// This should be used only for recovery
void setConfig(const nuraft::ptr<nuraft::cluster_config> & new_config)
{
set_config(new_config);
}
// Manually reconfigure the cluster
// This should be used only for recovery
void forceReconfigure(const nuraft::ptr<nuraft::cluster_config> & new_config)
{
reconfigure(new_config);
}
void commit_in_bg() override
{
// For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety
// This includes failed allocation which can produce an unknown state for the storage,
// making it impossible to handle correctly.
2022-08-04 07:25:28 +00:00
// We block the memory tracker for all the commit operations (including KeeperStateMachine::commit)
// assuming that the allocations are small
2022-08-04 13:03:05 +00:00
LockMemoryExceptionInThread blocker{VariableContext::Global};
nuraft::raft_server::commit_in_bg();
}
2022-04-14 12:00:47 +00:00
using nuraft::raft_server::raft_server;
// peers are initially marked as responding because at least one cycle
// of heartbeat * response_limit (20) need to pass to be marked
// as not responding
// until that time passes we can't say that the cluster is healthy
std::optional<Stopwatch> timer_from_init = std::make_optional<Stopwatch>();
};
2021-03-03 15:37:31 +00:00
2022-04-07 09:25:01 +00:00
void KeeperServer::loadLatestConfig()
2021-01-22 16:04:57 +00:00
{
2021-10-18 15:27:51 +00:00
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_snapshot_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
else
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_log_store_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
}
else if (latest_snapshot_config)
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_snapshot_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
else if (latest_log_store_config)
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_log_store_config);
}
else
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
}
2022-04-07 09:25:01 +00:00
}
2022-04-26 07:32:02 +00:00
void KeeperServer::enterRecoveryMode(nuraft::raft_params & params)
2022-04-19 08:08:13 +00:00
{
LOG_WARNING(
log,
"This instance is in recovery mode. Until the quorum is restored, no requests should be sent to any "
"of the cluster instances. This instance will start accepting requests only when the recovery is finished.");
auto latest_config = state_manager->load_config();
nuraft::ptr<nuraft::cluster_config> new_config = std::make_shared<nuraft::cluster_config>(0, latest_config ? latest_config->get_log_idx() : 0);
new_config->set_log_idx(state_manager->load_log_store()->next_slot());
2022-04-19 08:08:13 +00:00
new_config->get_servers() = last_local_config->get_servers();
state_manager->save_config(*new_config);
params.with_custom_commit_quorum_size(1);
params.with_custom_election_quorum_size(1);
}
void KeeperServer::forceRecovery()
2022-04-07 09:25:01 +00:00
{
2022-04-26 07:32:02 +00:00
// notify threads containing the lock that we want to enter recovery mode
2022-04-14 12:00:47 +00:00
is_recovering = true;
2022-04-26 07:32:02 +00:00
std::lock_guard lock{server_write_mutex};
2022-04-19 08:08:13 +00:00
auto params = raft_instance->get_current_params();
2022-04-26 07:32:02 +00:00
enterRecoveryMode(params);
2022-04-19 08:08:13 +00:00
raft_instance->setConfig(state_manager->load_config());
raft_instance->update_params(params);
}
2022-04-12 14:08:32 +00:00
2022-08-08 09:07:54 +00:00
void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
2021-10-19 12:00:26 +00:00
nuraft::raft_params params;
2022-11-14 11:36:00 +00:00
params.parallel_log_appending_ = true;
2022-04-14 12:07:33 +00:00
params.heart_beat_interval_
= getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log);
params.election_timeout_lower_bound_ = getValueOrMaxInt32AndLogWarning(
coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log);
params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning(
coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log);
2023-02-13 11:22:08 +00:00
if (params.election_timeout_lower_bound_ || params.election_timeout_upper_bound_)
{
2023-02-13 11:22:08 +00:00
if (params.election_timeout_lower_bound_ >= params.election_timeout_upper_bound_)
{
LOG_FATAL(
log,
"election_timeout_lower_bound_ms is greater than election_timeout_upper_bound_ms, this would disable leader election "
"completely.");
std::terminate();
}
}
2021-12-27 12:23:44 +00:00
params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log);
params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log);
if (params.snapshot_distance_ < 10000)
LOG_WARNING(log, "Very small snapshot_distance {} specified in coordination settings. "
2022-04-28 17:09:46 +00:00
"It doesn't make sense to specify such small value, because it can lead to degraded performance and another issues.", params.snapshot_distance_);
2021-12-27 12:23:44 +00:00
params.stale_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->stale_log_gap, "stale_log_gap", log);
params.fresh_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->fresh_log_gap, "fresh_log_gap", log);
2022-04-14 12:07:33 +00:00
params.client_req_timeout_
= getValueOrMaxInt32AndLogWarning(coordination_settings->operation_timeout_ms.totalMilliseconds(), "operation_timeout_ms", log);
2021-02-09 14:47:18 +00:00
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = std::max<int32_t>(
static_cast<int32_t>(coordination_settings->operation_timeout_ms.totalMilliseconds() * 2),
std::numeric_limits<int32_t>::max());
2022-04-14 12:07:33 +00:00
params.auto_forwarding_req_timeout_
= getValueOrMaxInt32AndLogWarning(coordination_settings->operation_timeout_ms.totalMilliseconds() * 2, "operation_timeout_ms", log);
params.max_append_size_
= getValueOrMaxInt32AndLogWarning(coordination_settings->max_requests_batch_size, "max_requests_batch_size", log);
2021-02-09 14:47:18 +00:00
2021-04-16 13:50:09 +00:00
params.return_method_ = nuraft::raft_params::async_handler;
2021-01-22 16:04:57 +00:00
nuraft::asio_service::options asio_opts{};
if (state_manager->isSecure())
2021-04-12 13:11:43 +00:00
{
#if USE_SSL
setSSLParams(asio_opts);
2021-04-12 13:11:43 +00:00
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for NuRaft is disabled because ClickHouse was built without SSL support.");
2021-04-12 13:11:43 +00:00
#endif
}
2022-04-14 12:00:47 +00:00
if (is_recovering)
2022-04-26 07:32:02 +00:00
enterRecoveryMode(params);
2021-01-22 16:04:57 +00:00
2021-04-07 10:21:53 +00:00
nuraft::raft_server::init_options init_options;
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
init_options.start_server_in_constructor_ = false;
2022-04-14 12:07:33 +00:00
init_options.raft_callback_ = [this](nuraft::cb_func::Type type, nuraft::cb_func::Param * param) { return callbackFunc(type, param); };
2021-04-07 10:21:53 +00:00
2021-04-07 10:18:07 +00:00
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
2022-08-17 09:29:08 +00:00
// we use the same config as for the CH replicas because it is for internal communication between Keeper instances
2022-08-08 09:07:54 +00:00
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host");
if (listen_hosts.empty())
{
auto asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
if (!asio_listener)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot create interserver listener on port {}", state_manager->getPort());
2022-08-08 09:07:54 +00:00
asio_listeners.emplace_back(std::move(asio_listener));
}
else
{
for (const auto & listen_host : listen_hosts)
{
auto asio_listener = asio_service->create_rpc_listener(listen_host, state_manager->getPort(), logger);
if (asio_listener)
asio_listeners.emplace_back(std::move(asio_listener));
}
}
2021-04-07 10:18:07 +00:00
nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
nuraft::ptr<nuraft::state_mgr> casted_state_manager = state_manager;
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
/// raft_server creates unique_ptr from it
2022-04-14 12:07:33 +00:00
nuraft::context * ctx
2022-08-08 09:07:54 +00:00
= new nuraft::context(casted_state_manager, casted_state_machine, asio_listeners, logger, rpc_cli_factory, scheduler, params);
2021-04-07 10:18:07 +00:00
2022-04-12 14:08:32 +00:00
raft_instance = nuraft::cs_new<KeeperRaftServer>(ctx, init_options);
2021-04-07 10:18:07 +00:00
2022-08-17 09:29:08 +00:00
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
2022-11-21 12:22:36 +00:00
state_manager->getLogStore()->setRaftServer(raft_instance);
2021-04-07 10:21:53 +00:00
raft_instance->start_server(init_options.skip_initial_election_timeout_);
2022-04-11 06:41:46 +00:00
2022-04-14 12:00:47 +00:00
nuraft::ptr<nuraft::raft_server> casted_raft_server = raft_instance;
2022-08-08 09:07:54 +00:00
for (const auto & asio_listener : asio_listeners)
{
asio_listener->listen(casted_raft_server);
}
2022-04-14 12:00:47 +00:00
}
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
2022-05-06 12:25:25 +00:00
auto log_store = state_manager->load_log_store();
auto next_log_idx = log_store->next_slot();
if (next_log_idx > 0 && next_log_idx > state_machine->last_commit_index())
{
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, next_log_idx);
size_t preprocessed = 0;
2022-05-19 09:45:38 +00:00
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
2022-05-06 12:25:25 +00:00
auto idx = state_machine->last_commit_index() + 1;
for (const auto & entry : *log_entries)
{
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
2022-05-12 08:58:36 +00:00
state_machine->pre_commit(idx, entry->get_buf());
2022-05-06 12:25:25 +00:00
++idx;
++preprocessed;
if (preprocessed % 50000 == 0)
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
2022-05-06 12:25:25 +00:00
}
LOG_INFO(log, "Preprocessing done");
2022-05-06 12:25:25 +00:00
}
2022-04-14 12:00:47 +00:00
loadLatestConfig();
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
2022-08-08 09:07:54 +00:00
launchRaftServer(config, enable_ipv6);
2022-07-23 14:27:44 +00:00
keeper_context->server_state = KeeperContext::Phase::RUNNING;
2021-04-07 10:18:07 +00:00
}
void KeeperServer::shutdownRaftServer()
{
size_t timeout = coordination_settings->shutdown_timeout.totalSeconds();
if (!raft_instance)
{
LOG_INFO(log, "RAFT doesn't start, shutdown not required");
return;
}
raft_instance->shutdown();
2022-07-30 11:34:17 +00:00
keeper_context->server_state = KeeperContext::Phase::SHUTDOWN;
if (create_snapshot_on_exit)
raft_instance->create_snapshot();
2021-04-07 10:18:07 +00:00
raft_instance.reset();
2022-08-08 09:07:54 +00:00
for (const auto & asio_listener : asio_listeners)
2021-04-07 10:18:07 +00:00
{
2022-08-08 09:07:54 +00:00
if (asio_listener)
{
asio_listener->stop();
asio_listener->shutdown();
}
2021-04-07 10:18:07 +00:00
}
if (asio_service)
{
asio_service->stop();
size_t count = 0;
while (asio_service->get_active_workers() != 0 && count < timeout * 100)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
count++;
}
}
if (asio_service->get_active_workers() != 0)
LOG_WARNING(log, "Failed to shutdown RAFT server in {} seconds", timeout);
}
2021-03-29 08:24:56 +00:00
void KeeperServer::shutdown()
2021-01-22 16:04:57 +00:00
{
2021-04-07 10:18:07 +00:00
shutdownRaftServer();
2022-11-21 15:01:18 +00:00
state_manager->flushAndShutDownLogStore();
2022-09-07 09:20:47 +00:00
state_machine->shutdownStorage();
2021-01-22 16:04:57 +00:00
}
namespace
{
2022-06-15 12:48:30 +00:00
// Serialize the request with all the necessary information for the leader
// we don't know ZXID and digest yet so we don't serialize it
2022-05-13 13:43:42 +00:00
nuraft::ptr<nuraft::buffer> getZooKeeperRequestMessage(const KeeperStorage::RequestForSession & request_for_session)
{
DB::WriteBufferFromNuraftBuffer write_buf;
DB::writeIntBinary(request_for_session.session_id, write_buf);
request_for_session.request->write(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
return write_buf.getBuffer();
}
2022-06-15 12:48:30 +00:00
// Serialize the request for the log entry
2022-05-12 08:58:36 +00:00
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestForSession & request_for_session)
2021-01-22 16:04:57 +00:00
{
2022-05-12 08:58:36 +00:00
DB::WriteBufferFromNuraftBuffer write_buf;
DB::writeIntBinary(request_for_session.session_id, write_buf);
request_for_session.request->write(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
DB::writeIntBinary(request_for_session.zxid, write_buf);
2022-05-17 08:11:08 +00:00
assert(request_for_session.digest);
DB::writeIntBinary(request_for_session.digest->version, write_buf);
if (request_for_session.digest->version != KeeperStorage::DigestVersion::NO_DIGEST)
DB::writeIntBinary(request_for_session.digest->value, write_buf);
2022-05-16 12:12:29 +00:00
2022-05-12 08:58:36 +00:00
return write_buf.getBuffer();
2021-01-22 16:04:57 +00:00
}
}
2021-04-16 13:50:09 +00:00
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
2021-04-16 14:00:12 +00:00
if (!request_for_session.request->isReadRequest())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
2021-04-16 13:50:09 +00:00
state_machine->processReadRequest(request_for_session);
2021-01-22 16:04:57 +00:00
}
2021-04-16 13:50:09 +00:00
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
2021-01-22 16:04:57 +00:00
{
2021-04-16 13:50:09 +00:00
std::vector<nuraft::ptr<nuraft::buffer>> entries;
2022-05-12 08:58:36 +00:00
for (const auto & request_for_session : requests_for_sessions)
{
2022-05-13 13:43:42 +00:00
entries.push_back(getZooKeeperRequestMessage(request_for_session));
2022-05-12 08:58:36 +00:00
}
2022-04-26 07:32:02 +00:00
std::lock_guard lock{server_write_mutex};
2022-04-19 08:08:13 +00:00
if (is_recovering)
return nullptr;
2022-02-11 09:18:55 +00:00
return raft_instance->append_entries(entries);
2021-01-22 16:04:57 +00:00
}
2021-01-21 20:01:25 +00:00
2021-03-29 08:24:56 +00:00
bool KeeperServer::isLeader() const
2021-01-27 17:54:25 +00:00
{
return raft_instance->is_leader();
}
2021-11-18 20:17:22 +00:00
bool KeeperServer::isObserver() const
{
2021-11-18 20:17:22 +00:00
auto srv_config = state_manager->get_srv_config();
return srv_config->is_learner();
}
2021-11-18 20:17:22 +00:00
bool KeeperServer::isFollower() const
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
return !isLeader() && !isObserver();
}
bool KeeperServer::isLeaderAlive() const
{
return raft_instance && raft_instance->is_leader_alive();
2021-10-27 12:26:42 +00:00
}
/// TODO test whether taking failed peer in count
2021-11-18 20:17:22 +00:00
uint64_t KeeperServer::getFollowerCount() const
2021-10-27 12:26:42 +00:00
{
return raft_instance->get_peer_info_all().size();
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperServer::getSyncedFollowerCount() const
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
uint64_t last_log_idx = raft_instance->get_last_log_idx();
const auto followers = raft_instance->get_peer_info_all();
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
uint64_t stale_followers = 0;
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
const uint64_t stale_follower_gap = raft_instance->get_current_params().stale_log_gap_;
2021-11-19 07:52:35 +00:00
for (const auto & fl : followers)
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
if (last_log_idx > fl.last_log_idx_ + stale_follower_gap)
2021-10-27 12:26:42 +00:00
stale_followers++;
}
return followers.size() - stale_followers;
}
2021-04-07 10:18:07 +00:00
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
2022-04-19 08:08:13 +00:00
if (is_recovering)
2022-04-12 14:08:32 +00:00
{
const auto finish_recovering = [&]
{
auto new_params = raft_instance->get_current_params();
new_params.custom_commit_quorum_size_ = 0;
new_params.custom_election_quorum_size_ = 0;
raft_instance->update_params(new_params);
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
is_recovering = false;
};
switch (type)
2022-04-19 08:08:13 +00:00
{
case nuraft::cb_func::HeartBeat:
{
if (raft_instance->isClusterHealthy())
finish_recovering();
break;
}
case nuraft::cb_func::NewConfig:
{
// Apply the manually set config when in recovery mode
// NuRaft will commit but skip the reconfigure if the current
// config is the same as the committed one
// Because we manually set the config to commit
// we need to call the reconfigure also
2022-04-26 07:32:02 +00:00
uint64_t log_idx = *static_cast<uint64_t *>(param->ctx);
auto config = state_manager->load_config();
if (log_idx == config->get_log_idx())
{
raft_instance->forceReconfigure(config);
// Single node cluster doesn't need to wait for any other nodes
2022-05-23 15:33:29 +00:00
// so we can finish recovering immediately after applying
// new configuration
if (config->get_servers().size() == 1)
finish_recovering();
}
break;
}
case nuraft::cb_func::ProcessReq:
// we don't accept requests from our peers or clients
// while in recovery mode
return nuraft::cb_func::ReturnCode::ReturnNull;
default:
break;
2022-04-19 08:08:13 +00:00
}
2022-04-12 14:08:32 +00:00
}
if (initialized_flag)
2022-05-12 08:58:36 +00:00
{
switch (type)
{
2022-06-15 12:48:30 +00:00
// This event is called before a single log is appended to the entry on the leader node
2022-05-19 09:45:38 +00:00
case nuraft::cb_func::PreAppendLog:
2022-05-12 08:58:36 +00:00
{
2022-05-19 09:45:38 +00:00
// we are relying on the fact that request are being processed under a mutex
// and not a RW lock
auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
assert(entry->get_val_type() == nuraft::app_log);
auto next_zxid = state_machine->getNextZxid();
2022-05-23 07:55:23 +00:00
2022-05-19 09:45:38 +00:00
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf);
request_for_session.zxid = next_zxid;
2022-09-07 09:14:03 +00:00
if (!state_machine->preprocess(request_for_session))
return nuraft::cb_func::ReturnCode::ReturnNull;
2022-05-19 09:45:38 +00:00
request_for_session.digest = state_machine->getNodesDigest();
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type());
2022-05-12 08:58:36 +00:00
break;
}
2022-07-28 12:50:51 +00:00
case nuraft::cb_func::AppendLogFailed:
{
// we are relying on the fact that request are being processed under a mutex
// and not a RW lock
auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
assert(entry->get_val_type() == nuraft::app_log);
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf);
2022-07-28 14:06:45 +00:00
state_machine->rollbackRequest(request_for_session, true);
2022-07-28 12:50:51 +00:00
break;
}
2022-05-12 08:58:36 +00:00
default:
break;
}
2022-05-09 07:02:11 +00:00
return nuraft::cb_func::ReturnCode::Ok;
2022-05-12 08:58:36 +00:00
}
2021-03-03 15:37:31 +00:00
size_t last_commited = state_machine->last_commit_index();
size_t next_index = state_manager->getLogStore()->next_slot();
bool commited_store = false;
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;
2022-04-14 12:07:33 +00:00
auto set_initialized = [this]()
{
std::lock_guard lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};
switch (type)
{
case nuraft::cb_func::BecomeLeader:
{
/// We become leader and store is empty or we already committed it
if (commited_store || initial_batch_committed)
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
2021-03-22 10:45:22 +00:00
case nuraft::cb_func::BecomeFollower:
2021-03-26 10:55:39 +00:00
case nuraft::cb_func::GotAppendEntryReqFromLeader:
2021-03-22 10:45:22 +00:00
{
2021-04-07 10:18:07 +00:00
if (param->leaderId != -1)
2021-03-26 10:55:39 +00:00
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
auto our_index = raft_instance->get_committed_log_idx();
/// This may happen when we start RAFT cluster from scratch.
/// Node first became leader, and after that some other node became leader.
/// BecameFresh for this node will not be called because it was already fresh
/// when it was leader.
if (leader_index < our_index + coordination_settings->fresh_log_gap)
set_initialized();
}
2021-03-22 10:45:22 +00:00
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFresh:
{
set_initialized(); /// We are fresh follower, ready to serve requests.
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::InitialBatchCommited:
{
2021-04-07 10:18:07 +00:00
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
default: /// ignore other events
return nuraft::cb_func::ReturnCode::Ok;
}
}
2021-03-29 08:24:56 +00:00
void KeeperServer::waitInit()
2021-01-27 17:54:25 +00:00
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
2021-03-26 10:55:39 +00:00
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
2022-09-21 15:12:16 +00:00
LOG_WARNING(log, "Failed to wait for RAFT initialization in {}ms, will continue in background", timeout);
2021-01-27 17:54:25 +00:00
}
2021-09-02 20:37:34 +00:00
std::vector<int64_t> KeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}
2021-10-19 12:00:26 +00:00
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
2021-10-18 15:27:51 +00:00
{
auto diff = state_manager->getConfigurationDiff(config);
if (!diff.empty())
2022-04-19 08:08:13 +00:00
{
2022-04-26 07:32:02 +00:00
std::lock_guard lock{server_write_mutex};
2022-04-14 12:00:47 +00:00
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
2022-04-19 08:08:13 +00:00
}
return diff;
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
2021-10-19 12:00:26 +00:00
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
{
2022-04-26 07:32:02 +00:00
std::lock_guard lock{server_write_mutex};
2022-04-19 08:08:13 +00:00
if (is_recovering)
return;
2021-10-19 12:00:26 +00:00
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
2022-04-19 08:08:13 +00:00
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
added = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
break;
}
2021-10-18 15:27:51 +00:00
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
2022-04-14 12:07:33 +00:00
LOG_INFO(
log,
"Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry",
task.server->get_id(),
i + 1,
sleep_ms * (i + 1));
2021-10-19 12:00:26 +00:00
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
if (!added)
2022-04-14 12:07:33 +00:00
throw Exception(
ErrorCodes::RAFT_ERROR,
"Configuration change to add server (id {}) was not accepted by RAFT after all {} retries",
task.server->get_id(),
coordination_settings->configuration_change_tries_count);
2021-10-19 12:00:26 +00:00
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
bool removed = false;
2021-10-19 13:11:29 +00:00
if (task.server->get_id() == state_manager->server_id())
2021-10-18 15:27:51 +00:00
{
2022-04-14 12:07:33 +00:00
LOG_INFO(
log,
"Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
2021-10-19 12:00:26 +00:00
raft_instance->yield_leadership();
return;
}
2022-04-19 08:08:13 +00:00
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
2021-10-19 12:00:26 +00:00
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
removed = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
break;
}
2021-10-18 15:27:51 +00:00
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
2022-04-14 12:07:33 +00:00
LOG_INFO(
log,
"Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry",
task.server->get_id(),
i + 1,
sleep_ms * (i + 1));
2021-10-19 12:00:26 +00:00
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
if (!removed)
2022-04-14 12:07:33 +00:00
throw Exception(
ErrorCodes::RAFT_ERROR,
"Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries",
task.server->get_id(),
coordination_settings->configuration_change_tries_count);
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
2021-10-18 15:27:51 +00:00
}
2021-10-19 13:11:29 +00:00
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
{
2022-04-19 08:08:13 +00:00
if (is_recovering)
return false;
2021-10-19 13:11:29 +00:00
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
2022-04-19 08:08:13 +00:00
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
2021-10-19 13:11:29 +00:00
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Server with id {} was successfully added by leader", task.server->get_id());
2021-10-19 13:11:29 +00:00
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to add server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
2021-10-19 13:11:29 +00:00
2022-04-19 08:08:13 +00:00
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
2021-10-19 13:11:29 +00:00
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Server with id {} was successfully removed by leader", task.server->get_id());
2021-10-19 13:11:29 +00:00
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to remove server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
return true;
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
return true;
}
2022-07-01 13:57:24 +00:00
Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
{
Keeper4LWInfo result;
result.is_leader = raft_instance->is_leader();
auto srv_config = state_manager->get_srv_config();
result.is_observer = srv_config->is_learner();
result.is_follower = !result.is_leader && !result.is_observer;
result.has_leader = result.is_leader || isLeaderAlive();
2022-07-02 14:02:42 +00:00
result.is_standalone = !result.is_follower && getFollowerCount() == 0;
2022-07-01 13:57:24 +00:00
if (result.is_leader)
{
result.follower_count = getFollowerCount();
result.synced_follower_count = getSyncedFollowerCount();
}
result.total_nodes_count = getKeeperStateMachine()->getNodesCount();
result.last_zxid = getKeeperStateMachine()->getLastProcessedZxid();
return result;
}
2022-10-24 12:08:58 +00:00
uint64_t KeeperServer::createSnapshot()
2022-09-26 10:29:15 +00:00
{
2022-10-24 09:23:47 +00:00
uint64_t log_idx = raft_instance->create_snapshot();
if (log_idx != 0)
2022-10-24 12:08:58 +00:00
LOG_INFO(log, "Snapshot creation scheduled with last committed log index {}.", log_idx);
else
2022-11-05 09:56:55 +00:00
LOG_WARNING(log, "Failed to schedule snapshot creation task.");
2022-10-24 12:08:58 +00:00
return log_idx;
}
2022-10-24 12:08:58 +00:00
KeeperLogInfo KeeperServer::getKeeperLogInfo()
{
2022-10-24 12:08:58 +00:00
KeeperLogInfo log_info;
2022-11-05 09:56:55 +00:00
auto log_store = state_manager->load_log_store();
if (log_store)
{
log_info.first_log_idx = log_store->start_index();
log_info.first_log_term = log_store->term_at(log_info.first_log_idx);
}
if (raft_instance)
{
log_info.last_log_idx = raft_instance->get_last_log_idx();
log_info.last_log_term = raft_instance->get_last_log_term();
log_info.last_committed_log_idx = raft_instance->get_committed_log_idx();
log_info.leader_committed_log_idx = raft_instance->get_leader_committed_log_idx();
log_info.target_committed_log_idx = raft_instance->get_target_committed_log_idx();
log_info.last_snapshot_idx = raft_instance->get_last_snapshot_idx();
}
2022-10-24 12:08:58 +00:00
return log_info;
2022-09-26 10:29:15 +00:00
}
bool KeeperServer::requestLeader()
{
2022-11-08 10:44:43 +00:00
return isLeader() || raft_instance->request_leadership();
}
void KeeperServer::recalculateStorageStats()
{
state_machine->recalculateStorageStats();
}
2021-01-21 20:01:25 +00:00
}