2021-10-03 09:54:23 +00:00
|
|
|
#include <Coordination/Defines.h>
|
2022-04-14 12:07:33 +00:00
|
|
|
#include <Coordination/KeeperServer.h>
|
2021-04-12 13:11:43 +00:00
|
|
|
|
2021-10-27 23:10:39 +00:00
|
|
|
#include "config_core.h"
|
2021-04-12 13:11:43 +00:00
|
|
|
|
2022-04-14 12:07:33 +00:00
|
|
|
#include <chrono>
|
|
|
|
#include <filesystem>
|
|
|
|
#include <string>
|
2021-03-29 08:24:56 +00:00
|
|
|
#include <Coordination/KeeperStateMachine.h>
|
|
|
|
#include <Coordination/KeeperStateManager.h>
|
2022-04-14 12:07:33 +00:00
|
|
|
#include <Coordination/LoggerWrapper.h>
|
2021-01-22 16:04:57 +00:00
|
|
|
#include <Coordination/ReadBufferFromNuraftBuffer.h>
|
2022-04-14 12:07:33 +00:00
|
|
|
#include <Coordination/WriteBufferFromNuraftBuffer.h>
|
2021-01-22 16:04:57 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2021-05-23 17:54:42 +00:00
|
|
|
#include <boost/algorithm/string.hpp>
|
2022-04-12 14:08:32 +00:00
|
|
|
#include <libnuraft/raft_server.hxx>
|
2022-04-14 12:07:33 +00:00
|
|
|
#include <Poco/Util/AbstractConfiguration.h>
|
|
|
|
#include <Poco/Util/Application.h>
|
|
|
|
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
2021-01-21 20:01:25 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-01-25 12:29:12 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int RAFT_ERROR;
|
2021-04-12 12:25:52 +00:00
|
|
|
extern const int NO_ELEMENTS_IN_CONFIG;
|
|
|
|
extern const int SUPPORT_IS_DISABLED;
|
2021-04-16 18:35:03 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-05-23 17:54:42 +00:00
|
|
|
extern const int INVALID_CONFIG_PARAMETER;
|
2021-01-25 12:29:12 +00:00
|
|
|
}
|
2021-01-22 16:04:57 +00:00
|
|
|
|
2021-04-12 13:11:43 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
|
|
|
#if USE_SSL
|
|
|
|
void setSSLParams(nuraft::asio_service::options & asio_opts)
|
|
|
|
{
|
|
|
|
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
|
|
|
|
String certificate_file_property = "openSSL.server.certificateFile";
|
|
|
|
String private_key_file_property = "openSSL.server.privateKeyFile";
|
|
|
|
String root_ca_file_property = "openSSL.server.caConfig";
|
|
|
|
|
|
|
|
if (!config.has(certificate_file_property))
|
|
|
|
throw Exception("Server certificate file is not set.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
|
|
|
|
|
|
|
if (!config.has(private_key_file_property))
|
|
|
|
throw Exception("Server private key file is not set.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
|
|
|
|
|
|
|
asio_opts.enable_ssl_ = true;
|
|
|
|
asio_opts.server_cert_file_ = config.getString(certificate_file_property);
|
|
|
|
asio_opts.server_key_file_ = config.getString(private_key_file_property);
|
|
|
|
|
|
|
|
if (config.has(root_ca_file_property))
|
|
|
|
asio_opts.root_cert_file_ = config.getString(root_ca_file_property);
|
|
|
|
|
|
|
|
if (config.getBool("openSSL.server.loadDefaultCAFile", false))
|
|
|
|
asio_opts.load_default_ca_file_ = true;
|
|
|
|
|
|
|
|
if (config.getString("openSSL.server.verificationMode", "none") == "none")
|
|
|
|
asio_opts.skip_verification_ = true;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
2021-10-27 12:26:42 +00:00
|
|
|
std::string checkAndGetSuperdigest(const String & user_and_digest)
|
2021-05-23 17:54:42 +00:00
|
|
|
{
|
2021-10-27 12:26:42 +00:00
|
|
|
if (user_and_digest.empty())
|
2021-05-23 17:54:42 +00:00
|
|
|
return "";
|
|
|
|
|
|
|
|
std::vector<std::string> scheme_and_id;
|
|
|
|
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
|
|
|
|
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
|
2022-04-14 12:07:33 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
|
2021-05-23 17:54:42 +00:00
|
|
|
|
2021-05-24 12:18:04 +00:00
|
|
|
return user_and_digest;
|
2021-05-23 17:54:42 +00:00
|
|
|
}
|
|
|
|
|
2021-12-27 12:23:44 +00:00
|
|
|
int32_t getValueOrMaxInt32AndLogWarning(uint64_t value, const std::string & name, Poco::Logger * log)
|
|
|
|
{
|
|
|
|
if (value > std::numeric_limits<int32_t>::max())
|
|
|
|
{
|
2022-04-14 12:07:33 +00:00
|
|
|
LOG_WARNING(
|
|
|
|
log,
|
|
|
|
"Got {} value for setting '{}' which is bigger than int32_t max value, lowering value to {}.",
|
|
|
|
value,
|
|
|
|
name,
|
|
|
|
std::numeric_limits<int32_t>::max());
|
2021-12-27 12:23:44 +00:00
|
|
|
return std::numeric_limits<int32_t>::max();
|
|
|
|
}
|
|
|
|
|
|
|
|
return static_cast<int32_t>(value);
|
|
|
|
}
|
|
|
|
|
2021-04-12 13:11:43 +00:00
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperServer::KeeperServer(
|
2021-11-18 20:17:22 +00:00
|
|
|
const KeeperConfigurationAndSettingsPtr & configuration_and_settings_,
|
2021-02-11 09:17:57 +00:00
|
|
|
const Poco::Util::AbstractConfiguration & config,
|
2021-03-05 10:40:24 +00:00
|
|
|
ResponsesQueue & responses_queue_,
|
2021-10-27 12:26:42 +00:00
|
|
|
SnapshotsQueue & snapshots_queue_)
|
2021-11-18 20:17:22 +00:00
|
|
|
: server_id(configuration_and_settings_->server_id)
|
|
|
|
, coordination_settings(configuration_and_settings_->coordination_settings)
|
2021-03-29 08:24:56 +00:00
|
|
|
, state_machine(nuraft::cs_new<KeeperStateMachine>(
|
2022-04-14 12:07:33 +00:00
|
|
|
responses_queue_,
|
|
|
|
snapshots_queue_,
|
|
|
|
configuration_and_settings_->snapshot_storage_path,
|
|
|
|
coordination_settings,
|
|
|
|
checkAndGetSuperdigest(configuration_and_settings_->super_digest)))
|
|
|
|
, state_manager(nuraft::cs_new<KeeperStateManager>(
|
|
|
|
server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings))
|
2021-04-07 10:18:07 +00:00
|
|
|
, log(&Poco::Logger::get("KeeperServer"))
|
2022-04-14 12:07:33 +00:00
|
|
|
, is_recovering(config.has("keeper_server.force_recovery") && config.getBool("keeper_server.force_recovery"))
|
2021-01-22 16:04:57 +00:00
|
|
|
{
|
2021-03-10 16:12:32 +00:00
|
|
|
if (coordination_settings->quorum_reads)
|
2021-04-07 10:18:07 +00:00
|
|
|
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
|
2021-01-22 16:04:57 +00:00
|
|
|
}
|
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
|
|
|
|
{
|
|
|
|
bool isClusterHealthy()
|
|
|
|
{
|
|
|
|
if (timer_from_init)
|
|
|
|
{
|
2022-04-14 12:07:33 +00:00
|
|
|
size_t expiry = get_current_params().heart_beat_interval_ * raft_server::raft_limits_.response_limit_;
|
2022-04-14 12:00:47 +00:00
|
|
|
|
|
|
|
if (timer_from_init->elapsedMilliseconds() < expiry)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
timer_from_init.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
const size_t voting_members = get_num_voting_members();
|
|
|
|
const auto not_responding_peers = get_not_responding_peers();
|
|
|
|
const auto quorum_size = voting_members / 2 + 1;
|
|
|
|
const auto max_not_responding_peers = voting_members - quorum_size;
|
|
|
|
|
|
|
|
return not_responding_peers <= max_not_responding_peers;
|
|
|
|
}
|
|
|
|
|
|
|
|
using nuraft::raft_server::raft_server;
|
|
|
|
|
|
|
|
// peers are initially marked as responding because at least one cycle
|
|
|
|
// of heartbeat * response_limit (20) need to pass to be marked
|
|
|
|
// as not responding
|
|
|
|
// until that time passes we can't say that the cluster is healthy
|
|
|
|
std::optional<Stopwatch> timer_from_init = std::make_optional<Stopwatch>();
|
|
|
|
};
|
|
|
|
|
2022-04-07 09:25:01 +00:00
|
|
|
void KeeperServer::loadLatestConfig()
|
2021-01-22 16:04:57 +00:00
|
|
|
{
|
2021-10-18 15:27:51 +00:00
|
|
|
auto latest_snapshot_config = state_machine->getClusterConfig();
|
|
|
|
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
|
|
|
|
|
|
|
|
if (latest_snapshot_config && latest_log_store_config)
|
|
|
|
{
|
|
|
|
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
|
2021-10-19 12:00:26 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
|
2021-10-19 13:11:29 +00:00
|
|
|
state_manager->save_config(*latest_snapshot_config);
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
else
|
2021-10-19 12:00:26 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
|
2021-10-19 13:11:29 +00:00
|
|
|
state_manager->save_config(*latest_log_store_config);
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
else if (latest_snapshot_config)
|
2021-10-19 12:00:26 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
|
2021-10-19 13:11:29 +00:00
|
|
|
state_manager->save_config(*latest_snapshot_config);
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
else if (latest_log_store_config)
|
2021-02-25 20:30:55 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
|
2021-10-19 13:11:29 +00:00
|
|
|
state_manager->save_config(*latest_log_store_config);
|
2021-02-25 20:30:55 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
|
2021-02-25 20:30:55 +00:00
|
|
|
}
|
2022-04-07 09:25:01 +00:00
|
|
|
}
|
|
|
|
|
2022-04-13 14:08:13 +00:00
|
|
|
void KeeperServer::forceRecovery()
|
2022-04-07 09:25:01 +00:00
|
|
|
{
|
2022-04-13 14:08:13 +00:00
|
|
|
shutdownRaftServer();
|
2022-04-14 12:00:47 +00:00
|
|
|
is_recovering = true;
|
|
|
|
launchRaftServer(true);
|
2022-04-13 14:08:13 +00:00
|
|
|
}
|
2022-04-12 14:08:32 +00:00
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
void KeeperServer::launchRaftServer(bool enable_ipv6)
|
2022-04-13 14:08:13 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
nuraft::raft_params params;
|
2022-04-14 12:07:33 +00:00
|
|
|
params.heart_beat_interval_
|
|
|
|
= getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log);
|
|
|
|
params.election_timeout_lower_bound_ = getValueOrMaxInt32AndLogWarning(
|
|
|
|
coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log);
|
|
|
|
params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning(
|
|
|
|
coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log);
|
2021-12-27 12:23:44 +00:00
|
|
|
params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log);
|
|
|
|
params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log);
|
|
|
|
params.stale_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->stale_log_gap, "stale_log_gap", log);
|
|
|
|
params.fresh_log_gap_ = getValueOrMaxInt32AndLogWarning(coordination_settings->fresh_log_gap, "fresh_log_gap", log);
|
2022-04-14 12:07:33 +00:00
|
|
|
params.client_req_timeout_
|
|
|
|
= getValueOrMaxInt32AndLogWarning(coordination_settings->operation_timeout_ms.totalMilliseconds(), "operation_timeout_ms", log);
|
2021-02-09 14:47:18 +00:00
|
|
|
params.auto_forwarding_ = coordination_settings->auto_forwarding;
|
2022-04-14 12:07:33 +00:00
|
|
|
params.auto_forwarding_req_timeout_
|
|
|
|
= std::max<uint64_t>(coordination_settings->operation_timeout_ms.totalMilliseconds() * 2, std::numeric_limits<int32_t>::max());
|
|
|
|
params.auto_forwarding_req_timeout_
|
|
|
|
= getValueOrMaxInt32AndLogWarning(coordination_settings->operation_timeout_ms.totalMilliseconds() * 2, "operation_timeout_ms", log);
|
|
|
|
params.max_append_size_
|
|
|
|
= getValueOrMaxInt32AndLogWarning(coordination_settings->max_requests_batch_size, "max_requests_batch_size", log);
|
2021-02-09 14:47:18 +00:00
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
params.return_method_ = nuraft::raft_params::async_handler;
|
2021-01-22 16:04:57 +00:00
|
|
|
|
2021-02-01 07:51:10 +00:00
|
|
|
nuraft::asio_service::options asio_opts{};
|
2021-04-12 12:25:52 +00:00
|
|
|
if (state_manager->isSecure())
|
2021-04-12 13:11:43 +00:00
|
|
|
{
|
|
|
|
#if USE_SSL
|
2021-04-12 12:25:52 +00:00
|
|
|
setSSLParams(asio_opts);
|
2021-04-12 13:11:43 +00:00
|
|
|
#else
|
2022-04-14 12:07:33 +00:00
|
|
|
throw Exception{
|
|
|
|
"SSL support for NuRaft is disabled because ClickHouse was built without SSL support.", ErrorCodes::SUPPORT_IS_DISABLED};
|
2021-04-12 13:11:43 +00:00
|
|
|
#endif
|
|
|
|
}
|
2021-04-07 08:49:10 +00:00
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
if (is_recovering)
|
2022-04-11 06:41:46 +00:00
|
|
|
{
|
2022-04-14 12:07:33 +00:00
|
|
|
LOG_WARNING(
|
|
|
|
log,
|
|
|
|
"This instance is in recovery mode. Until the quorum is restored, no requests should be sent to any "
|
|
|
|
"of the cluster instances. This instance will start accepting requests only when the recovery is finished.");
|
2022-04-12 14:08:32 +00:00
|
|
|
params.with_custom_commit_quorum_size(1);
|
|
|
|
params.with_custom_election_quorum_size(1);
|
|
|
|
|
|
|
|
auto latest_config = state_manager->load_config();
|
|
|
|
auto new_config = std::make_shared<nuraft::cluster_config>(0, latest_config ? latest_config->get_log_idx() : 0);
|
2022-04-14 12:00:47 +00:00
|
|
|
new_config->get_servers() = last_local_config->get_servers();
|
2022-04-12 14:08:32 +00:00
|
|
|
new_config->set_log_idx(state_manager->getLogStore()->next_slot());
|
2022-04-11 06:41:46 +00:00
|
|
|
|
2022-04-12 14:08:32 +00:00
|
|
|
state_manager->save_config(*new_config);
|
2022-04-11 06:41:46 +00:00
|
|
|
}
|
|
|
|
|
2021-04-07 10:21:53 +00:00
|
|
|
nuraft::raft_server::init_options init_options;
|
|
|
|
|
|
|
|
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
|
|
|
|
init_options.start_server_in_constructor_ = false;
|
2022-04-14 12:07:33 +00:00
|
|
|
init_options.raft_callback_ = [this](nuraft::cb_func::Type type, nuraft::cb_func::Param * param) { return callbackFunc(type, param); };
|
2021-04-07 10:21:53 +00:00
|
|
|
|
2022-04-12 14:08:32 +00:00
|
|
|
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", DB::LogsLevel::information);
|
2021-04-07 10:18:07 +00:00
|
|
|
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
|
2022-01-07 09:22:20 +00:00
|
|
|
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
|
2021-04-07 10:18:07 +00:00
|
|
|
|
|
|
|
if (!asio_listener)
|
|
|
|
return;
|
|
|
|
|
|
|
|
nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
|
|
|
|
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
|
|
|
|
|
|
|
|
nuraft::ptr<nuraft::state_mgr> casted_state_manager = state_manager;
|
|
|
|
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
|
|
|
|
|
|
|
|
/// raft_server creates unique_ptr from it
|
2022-04-14 12:07:33 +00:00
|
|
|
nuraft::context * ctx
|
|
|
|
= new nuraft::context(casted_state_manager, casted_state_machine, asio_listener, logger, rpc_cli_factory, scheduler, params);
|
2021-04-07 10:18:07 +00:00
|
|
|
|
2022-04-12 14:08:32 +00:00
|
|
|
raft_instance = nuraft::cs_new<KeeperRaftServer>(ctx, init_options);
|
2021-04-07 10:18:07 +00:00
|
|
|
|
2022-04-12 14:08:32 +00:00
|
|
|
raft_instance->start_server(state_manager->shouldStartAsFollower());
|
2022-04-11 06:41:46 +00:00
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
nuraft::ptr<nuraft::raft_server> casted_raft_server = raft_instance;
|
|
|
|
asio_listener->listen(casted_raft_server);
|
|
|
|
|
|
|
|
if (!raft_instance)
|
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
|
|
|
|
}
|
|
|
|
|
|
|
|
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
|
|
|
|
{
|
|
|
|
state_machine->init();
|
|
|
|
|
|
|
|
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
|
|
|
|
|
|
|
|
loadLatestConfig();
|
|
|
|
|
|
|
|
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
|
|
|
|
|
|
|
|
launchRaftServer(enable_ipv6);
|
2021-04-07 10:18:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void KeeperServer::shutdownRaftServer()
|
|
|
|
{
|
|
|
|
size_t timeout = coordination_settings->shutdown_timeout.totalSeconds();
|
|
|
|
|
|
|
|
if (!raft_instance)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "RAFT doesn't start, shutdown not required");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
raft_instance->shutdown();
|
|
|
|
raft_instance.reset();
|
|
|
|
|
|
|
|
if (asio_listener)
|
|
|
|
{
|
|
|
|
asio_listener->stop();
|
|
|
|
asio_listener->shutdown();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (asio_service)
|
|
|
|
{
|
|
|
|
asio_service->stop();
|
|
|
|
size_t count = 0;
|
|
|
|
while (asio_service->get_active_workers() != 0 && count < timeout * 100)
|
|
|
|
{
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
|
count++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (asio_service->get_active_workers() != 0)
|
|
|
|
LOG_WARNING(log, "Failed to shutdown RAFT server in {} seconds", timeout);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperServer::shutdown()
|
2021-01-22 16:04:57 +00:00
|
|
|
{
|
2021-02-04 12:07:41 +00:00
|
|
|
state_machine->shutdownStorage();
|
2021-02-17 20:36:25 +00:00
|
|
|
state_manager->flushLogStore();
|
2021-04-07 10:18:07 +00:00
|
|
|
shutdownRaftServer();
|
2021-01-22 16:04:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
2022-01-06 14:44:01 +00:00
|
|
|
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request)
|
2021-01-22 16:04:57 +00:00
|
|
|
{
|
|
|
|
DB::WriteBufferFromNuraftBuffer buf;
|
|
|
|
DB::writeIntBinary(session_id, buf);
|
|
|
|
request->write(buf);
|
2022-02-22 03:10:14 +00:00
|
|
|
DB::writeIntBinary(time, buf);
|
2021-01-22 16:04:57 +00:00
|
|
|
return buf.getBuffer();
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
|
|
|
|
{
|
2021-04-16 14:00:12 +00:00
|
|
|
if (!request_for_session.request->isReadRequest())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
state_machine->processReadRequest(request_for_session);
|
2021-01-22 16:04:57 +00:00
|
|
|
}
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
|
2021-01-22 16:04:57 +00:00
|
|
|
{
|
2021-04-16 13:50:09 +00:00
|
|
|
std::vector<nuraft::ptr<nuraft::buffer>> entries;
|
2022-01-06 14:44:01 +00:00
|
|
|
for (const auto & [session_id, time, request] : requests_for_sessions)
|
|
|
|
entries.push_back(getZooKeeperLogEntry(session_id, time, request));
|
2021-02-01 07:51:10 +00:00
|
|
|
|
2022-02-11 09:18:55 +00:00
|
|
|
return raft_instance->append_entries(entries);
|
2021-01-22 16:04:57 +00:00
|
|
|
}
|
2021-01-21 20:01:25 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
bool KeeperServer::isLeader() const
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
|
|
|
return raft_instance->is_leader();
|
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
|
|
|
|
bool KeeperServer::isObserver() const
|
2021-02-01 07:51:10 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
auto srv_config = state_manager->get_srv_config();
|
|
|
|
return srv_config->is_learner();
|
2021-02-01 07:51:10 +00:00
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
|
|
|
|
bool KeeperServer::isFollower() const
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
return !isLeader() && !isObserver();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool KeeperServer::isLeaderAlive() const
|
|
|
|
{
|
|
|
|
return raft_instance->is_leader_alive();
|
2021-10-27 12:26:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO test whether taking failed peer in count
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t KeeperServer::getFollowerCount() const
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
|
|
|
return raft_instance->get_peer_info_all().size();
|
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t KeeperServer::getSyncedFollowerCount() const
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t last_log_idx = raft_instance->get_last_log_idx();
|
|
|
|
const auto followers = raft_instance->get_peer_info_all();
|
2021-10-27 12:26:42 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t stale_followers = 0;
|
2021-10-27 12:26:42 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
const uint64_t stale_follower_gap = raft_instance->get_current_params().stale_log_gap_;
|
2021-11-19 07:52:35 +00:00
|
|
|
for (const auto & fl : followers)
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
if (last_log_idx > fl.last_log_idx_ + stale_follower_gap)
|
2021-10-27 12:26:42 +00:00
|
|
|
stale_followers++;
|
|
|
|
}
|
|
|
|
return followers.size() - stale_followers;
|
|
|
|
}
|
|
|
|
|
2021-04-07 10:18:07 +00:00
|
|
|
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
|
2021-02-10 09:28:53 +00:00
|
|
|
{
|
2022-04-14 12:00:47 +00:00
|
|
|
if (type == nuraft::cb_func::HeartBeat && is_recovering && raft_instance->isClusterHealthy())
|
2022-04-12 14:08:32 +00:00
|
|
|
{
|
|
|
|
auto new_params = raft_instance->get_current_params();
|
|
|
|
new_params.custom_commit_quorum_size_ = 0;
|
|
|
|
new_params.custom_election_quorum_size_ = 0;
|
|
|
|
raft_instance->update_params(new_params);
|
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
LOG_INFO(log, "Recovery is done. You can continue using cluster normally.");
|
|
|
|
is_recovering = false;
|
2022-04-12 14:08:32 +00:00
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
}
|
|
|
|
|
2021-04-06 12:25:15 +00:00
|
|
|
if (initialized_flag)
|
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
|
2021-03-03 15:37:31 +00:00
|
|
|
size_t last_commited = state_machine->last_commit_index();
|
|
|
|
size_t next_index = state_manager->getLogStore()->next_slot();
|
|
|
|
bool commited_store = false;
|
|
|
|
if (next_index < last_commited || next_index - last_commited <= 1)
|
|
|
|
commited_store = true;
|
2021-02-25 20:30:55 +00:00
|
|
|
|
2022-04-14 12:07:33 +00:00
|
|
|
auto set_initialized = [this]()
|
2021-02-10 09:28:53 +00:00
|
|
|
{
|
2021-04-07 10:18:07 +00:00
|
|
|
std::unique_lock lock(initialized_mutex);
|
2021-02-10 09:28:53 +00:00
|
|
|
initialized_flag = true;
|
|
|
|
initialized_cv.notify_all();
|
2021-02-25 20:30:55 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
case nuraft::cb_func::BecomeLeader:
|
|
|
|
{
|
2021-03-23 12:07:21 +00:00
|
|
|
/// We become leader and store is empty or we already committed it
|
|
|
|
if (commited_store || initial_batch_committed)
|
2021-02-25 20:30:55 +00:00
|
|
|
set_initialized();
|
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
}
|
2021-03-22 10:45:22 +00:00
|
|
|
case nuraft::cb_func::BecomeFollower:
|
2021-03-26 10:55:39 +00:00
|
|
|
case nuraft::cb_func::GotAppendEntryReqFromLeader:
|
2021-03-22 10:45:22 +00:00
|
|
|
{
|
2021-04-07 10:18:07 +00:00
|
|
|
if (param->leaderId != -1)
|
2021-03-26 10:55:39 +00:00
|
|
|
{
|
|
|
|
auto leader_index = raft_instance->get_leader_committed_log_idx();
|
|
|
|
auto our_index = raft_instance->get_committed_log_idx();
|
|
|
|
/// This may happen when we start RAFT cluster from scratch.
|
|
|
|
/// Node first became leader, and after that some other node became leader.
|
|
|
|
/// BecameFresh for this node will not be called because it was already fresh
|
|
|
|
/// when it was leader.
|
|
|
|
if (leader_index < our_index + coordination_settings->fresh_log_gap)
|
|
|
|
set_initialized();
|
|
|
|
}
|
2021-03-22 10:45:22 +00:00
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
}
|
2021-02-25 20:30:55 +00:00
|
|
|
case nuraft::cb_func::BecomeFresh:
|
|
|
|
{
|
|
|
|
set_initialized(); /// We are fresh follower, ready to serve requests.
|
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
}
|
|
|
|
case nuraft::cb_func::InitialBatchCommited:
|
|
|
|
{
|
2021-04-07 10:18:07 +00:00
|
|
|
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
|
2021-02-25 20:30:55 +00:00
|
|
|
set_initialized();
|
2021-03-23 12:07:21 +00:00
|
|
|
initial_batch_committed = true;
|
2021-02-25 20:30:55 +00:00
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
|
|
|
}
|
|
|
|
default: /// ignore other events
|
|
|
|
return nuraft::cb_func::ReturnCode::Ok;
|
2021-02-10 09:28:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperServer::waitInit()
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
2021-02-10 09:28:53 +00:00
|
|
|
std::unique_lock lock(initialized_mutex);
|
2021-10-14 10:21:41 +00:00
|
|
|
|
2021-02-10 09:28:53 +00:00
|
|
|
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
|
2021-03-26 10:55:39 +00:00
|
|
|
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
|
2021-02-10 09:28:53 +00:00
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
|
2021-01-27 17:54:25 +00:00
|
|
|
}
|
|
|
|
|
2021-09-02 20:37:34 +00:00
|
|
|
std::vector<int64_t> KeeperServer::getDeadSessions()
|
2021-02-03 20:32:15 +00:00
|
|
|
{
|
|
|
|
return state_machine->getDeadSessions();
|
|
|
|
}
|
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2022-04-13 14:08:13 +00:00
|
|
|
auto diff = state_manager->getConfigurationDiff(config);
|
|
|
|
|
|
|
|
if (!diff.empty())
|
2022-04-14 12:00:47 +00:00
|
|
|
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
|
2022-04-13 14:08:13 +00:00
|
|
|
|
|
|
|
return diff;
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
|
|
|
|
{
|
|
|
|
size_t sleep_ms = 500;
|
|
|
|
if (task.action_type == ConfigUpdateActionType::AddServer)
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
|
|
|
|
bool added = false;
|
|
|
|
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
|
|
|
|
added = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!isLeader())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
auto result = raft_instance->add_srv(*task.server);
|
|
|
|
if (!result->get_accepted())
|
2022-04-14 12:07:33 +00:00
|
|
|
LOG_INFO(
|
|
|
|
log,
|
|
|
|
"Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry",
|
|
|
|
task.server->get_id(),
|
|
|
|
i + 1,
|
|
|
|
sleep_ms * (i + 1));
|
2021-10-19 12:00:26 +00:00
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
2021-10-19 12:00:26 +00:00
|
|
|
if (!added)
|
2022-04-14 12:07:33 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Configuration change to add server (id {}) was not accepted by RAFT after all {} retries",
|
|
|
|
task.server->get_id(),
|
|
|
|
coordination_settings->configuration_change_tries_count);
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
|
|
|
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
|
|
|
|
|
|
|
|
bool removed = false;
|
2021-10-19 13:11:29 +00:00
|
|
|
if (task.server->get_id() == state_manager->server_id())
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2022-04-14 12:07:33 +00:00
|
|
|
LOG_INFO(
|
|
|
|
log,
|
|
|
|
"Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
|
|
|
|
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
|
2021-10-19 12:00:26 +00:00
|
|
|
|
|
|
|
raft_instance->yield_leadership();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
|
|
|
|
{
|
|
|
|
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
|
|
|
|
removed = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!isLeader())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
auto result = raft_instance->remove_srv(task.server->get_id());
|
|
|
|
if (!result->get_accepted())
|
2022-04-14 12:07:33 +00:00
|
|
|
LOG_INFO(
|
|
|
|
log,
|
|
|
|
"Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry",
|
|
|
|
task.server->get_id(),
|
|
|
|
i + 1,
|
|
|
|
sleep_ms * (i + 1));
|
2021-10-19 12:00:26 +00:00
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
2021-10-19 12:00:26 +00:00
|
|
|
if (!removed)
|
2022-04-14 12:07:33 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries",
|
|
|
|
task.server->get_id(),
|
|
|
|
coordination_settings->configuration_change_tries_count);
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
2021-10-19 12:00:26 +00:00
|
|
|
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
|
|
|
|
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
|
|
|
|
else
|
|
|
|
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
|
|
|
|
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
|
|
|
|
{
|
|
|
|
size_t sleep_ms = 500;
|
|
|
|
if (task.action_type == ConfigUpdateActionType::AddServer)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
|
|
|
|
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
|
|
|
|
{
|
|
|
|
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
|
|
|
|
{
|
2021-10-19 13:37:28 +00:00
|
|
|
LOG_INFO(log, "Server with id {} was successfully added by leader", task.server->get_id());
|
2021-10-19 13:11:29 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (isLeader())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "We are leader now, probably we will have to add server {}", task.server->get_id());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
|
|
|
|
{
|
2021-10-19 13:37:28 +00:00
|
|
|
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
|
2021-10-19 13:11:29 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
|
|
|
|
{
|
|
|
|
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
|
|
|
|
{
|
2021-10-19 13:37:28 +00:00
|
|
|
LOG_INFO(log, "Server with id {} was successfully removed by leader", task.server->get_id());
|
2021-10-19 13:11:29 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (isLeader())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "We are leader now, probably we will have to remove server {}", task.server->get_id());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
|
|
|
|
return true;
|
|
|
|
else
|
|
|
|
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-01-21 20:01:25 +00:00
|
|
|
}
|