ClickHouse/src/Coordination/KeeperServer.cpp

522 lines
19 KiB
C++
Raw Normal View History

2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperServer.h>
#include <Coordination/Defines.h>
2021-04-12 13:11:43 +00:00
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
2021-01-22 16:04:57 +00:00
#include <Coordination/LoggerWrapper.h>
2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
2021-01-22 16:04:57 +00:00
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <chrono>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <string>
2021-05-12 14:05:44 +00:00
#include <filesystem>
#include <Poco/Util/Application.h>
2021-05-23 17:54:42 +00:00
#include <boost/algorithm/string.hpp>
2021-01-21 20:01:25 +00:00
namespace DB
{
2021-01-25 12:29:12 +00:00
namespace ErrorCodes
{
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
2021-04-16 18:35:03 +00:00
extern const int LOGICAL_ERROR;
2021-05-23 17:54:42 +00:00
extern const int INVALID_CONFIG_PARAMETER;
2021-01-25 12:29:12 +00:00
}
2021-01-22 16:04:57 +00:00
2021-04-12 13:11:43 +00:00
namespace
{
#if USE_SSL
void setSSLParams(nuraft::asio_service::options & asio_opts)
{
const Poco::Util::LayeredConfiguration & config = Poco::Util::Application::instance().config();
String certificate_file_property = "openSSL.server.certificateFile";
String private_key_file_property = "openSSL.server.privateKeyFile";
String root_ca_file_property = "openSSL.server.caConfig";
if (!config.has(certificate_file_property))
throw Exception("Server certificate file is not set.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (!config.has(private_key_file_property))
throw Exception("Server private key file is not set.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
asio_opts.enable_ssl_ = true;
asio_opts.server_cert_file_ = config.getString(certificate_file_property);
asio_opts.server_key_file_ = config.getString(private_key_file_property);
if (config.has(root_ca_file_property))
asio_opts.root_cert_file_ = config.getString(root_ca_file_property);
if (config.getBool("openSSL.server.loadDefaultCAFile", false))
asio_opts.load_default_ca_file_ = true;
if (config.getString("openSSL.server.verificationMode", "none") == "none")
asio_opts.skip_verification_ = true;
}
#endif
2021-05-18 14:08:56 +00:00
std::string getSnapshotsPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
2021-05-12 14:05:44 +00:00
{
/// the most specialized path
if (config.has("keeper_server.snapshot_storage_path"))
return config.getString("keeper_server.snapshot_storage_path");
if (config.has("keeper_server.storage_path"))
return std::filesystem::path{config.getString("keeper_server.storage_path")} / "snapshots";
2021-05-18 14:08:56 +00:00
if (standalone_keeper)
return std::filesystem::path{config.getString("path", KEEPER_DEFAULT_PATH)} / "snapshots";
else
return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/snapshots";
2021-05-12 14:05:44 +00:00
}
2021-05-23 17:54:42 +00:00
std::string checkAndGetSuperdigest(const Poco::Util::AbstractConfiguration & config)
{
if (!config.has("keeper_server.superdigest"))
return "";
auto user_and_digest = config.getString("keeper_server.superdigest");
std::vector<std::string> scheme_and_id;
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
2021-05-24 12:18:04 +00:00
return user_and_digest;
2021-05-23 17:54:42 +00:00
}
2021-04-12 13:11:43 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperServer::KeeperServer(
2021-02-11 09:17:57 +00:00
int server_id_,
2021-02-09 14:47:18 +00:00
const CoordinationSettingsPtr & coordination_settings_,
2021-02-11 09:17:57 +00:00
const Poco::Util::AbstractConfiguration & config,
2021-03-05 10:40:24 +00:00
ResponsesQueue & responses_queue_,
2021-05-18 14:08:56 +00:00
SnapshotsQueue & snapshots_queue_,
bool standalone_keeper)
2021-01-22 16:04:57 +00:00
: server_id(server_id_)
2021-02-09 14:47:18 +00:00
, coordination_settings(coordination_settings_)
2021-03-29 08:24:56 +00:00
, state_machine(nuraft::cs_new<KeeperStateMachine>(
responses_queue_, snapshots_queue_,
2021-05-18 14:08:56 +00:00
getSnapshotsPathFromConfig(config, standalone_keeper),
2021-05-23 17:54:42 +00:00
coordination_settings,
checkAndGetSuperdigest(config)))
2021-05-18 14:08:56 +00:00
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings, standalone_keeper))
2021-04-07 10:18:07 +00:00
, log(&Poco::Logger::get("KeeperServer"))
2021-01-22 16:04:57 +00:00
{
if (coordination_settings->quorum_reads)
2021-04-07 10:18:07 +00:00
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
2021-01-22 16:04:57 +00:00
}
2021-03-29 08:24:56 +00:00
void KeeperServer::startup()
2021-01-22 16:04:57 +00:00
{
2021-03-02 14:30:56 +00:00
state_machine->init();
2021-03-04 11:22:59 +00:00
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
2021-03-03 15:37:31 +00:00
2021-10-18 15:27:51 +00:00
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_snapshot_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
else
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_log_store_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
}
else if (latest_snapshot_config)
2021-10-19 12:00:26 +00:00
{
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_snapshot_config);
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
else if (latest_log_store_config)
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
2021-10-19 13:11:29 +00:00
state_manager->save_config(*latest_log_store_config);
}
else
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
}
2021-10-19 12:00:26 +00:00
nuraft::raft_params params;
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
2021-02-09 14:47:18 +00:00
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
2021-03-04 13:40:43 +00:00
params.stale_log_gap_ = coordination_settings->stale_log_gap;
2021-03-22 10:45:22 +00:00
params.fresh_log_gap_ = coordination_settings->fresh_log_gap;
2021-02-09 14:47:18 +00:00
params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds();
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
params.max_append_size_ = coordination_settings->max_requests_batch_size;
2021-02-09 14:47:18 +00:00
2021-04-16 13:50:09 +00:00
params.return_method_ = nuraft::raft_params::async_handler;
2021-01-22 16:04:57 +00:00
nuraft::asio_service::options asio_opts{};
if (state_manager->isSecure())
2021-04-12 13:11:43 +00:00
{
#if USE_SSL
setSSLParams(asio_opts);
2021-04-12 13:11:43 +00:00
#else
throw Exception{"SSL support for NuRaft is disabled because ClickHouse was built without SSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
2021-04-07 10:21:53 +00:00
launchRaftServer(params, asio_opts);
2021-01-22 16:04:57 +00:00
if (!raft_instance)
2021-01-25 12:29:12 +00:00
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
2021-01-22 16:04:57 +00:00
}
2021-04-07 10:18:07 +00:00
void KeeperServer::launchRaftServer(
const nuraft::raft_params & params,
2021-04-07 10:21:53 +00:00
const nuraft::asio_service::options & asio_opts)
2021-04-07 10:18:07 +00:00
{
2021-04-07 10:21:53 +00:00
nuraft::raft_server::init_options init_options;
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
init_options.start_server_in_constructor_ = false;
init_options.raft_callback_ = [this] (nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
return callbackFunc(type, param);
};
2021-04-07 10:18:07 +00:00
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger);
if (!asio_listener)
return;
nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
nuraft::ptr<nuraft::state_mgr> casted_state_manager = state_manager;
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
/// raft_server creates unique_ptr from it
nuraft::context * ctx = new nuraft::context(
casted_state_manager, casted_state_machine,
asio_listener, logger, rpc_cli_factory, scheduler, params);
2021-04-07 10:21:53 +00:00
raft_instance = nuraft::cs_new<nuraft::raft_server>(ctx, init_options);
2021-04-07 10:18:07 +00:00
2021-04-07 10:21:53 +00:00
raft_instance->start_server(init_options.skip_initial_election_timeout_);
2021-04-07 10:18:07 +00:00
asio_listener->listen(raft_instance);
}
void KeeperServer::shutdownRaftServer()
{
size_t timeout = coordination_settings->shutdown_timeout.totalSeconds();
if (!raft_instance)
{
LOG_INFO(log, "RAFT doesn't start, shutdown not required");
return;
}
raft_instance->shutdown();
raft_instance.reset();
if (asio_listener)
{
asio_listener->stop();
asio_listener->shutdown();
}
if (asio_service)
{
asio_service->stop();
size_t count = 0;
while (asio_service->get_active_workers() != 0 && count < timeout * 100)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
count++;
}
}
if (asio_service->get_active_workers() != 0)
LOG_WARNING(log, "Failed to shutdown RAFT server in {} seconds", timeout);
}
2021-03-29 08:24:56 +00:00
void KeeperServer::shutdown()
2021-01-22 16:04:57 +00:00
{
state_machine->shutdownStorage();
2021-02-17 20:36:25 +00:00
state_manager->flushLogStore();
2021-04-07 10:18:07 +00:00
shutdownRaftServer();
2021-01-22 16:04:57 +00:00
}
namespace
{
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
return buf.getBuffer();
}
}
2021-04-16 13:50:09 +00:00
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
2021-04-16 14:00:12 +00:00
if (!request_for_session.request->isReadRequest())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
2021-04-16 13:50:09 +00:00
state_machine->processReadRequest(request_for_session);
2021-01-22 16:04:57 +00:00
}
2021-04-16 13:50:09 +00:00
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
2021-01-22 16:04:57 +00:00
{
2021-02-04 19:29:46 +00:00
2021-04-16 13:50:09 +00:00
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & [session_id, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, request));
2021-04-16 13:50:09 +00:00
{
std::lock_guard lock(append_entries_mutex);
return raft_instance->append_entries(entries);
}
2021-01-22 16:04:57 +00:00
}
2021-01-21 20:01:25 +00:00
2021-03-29 08:24:56 +00:00
bool KeeperServer::isLeader() const
2021-01-27 17:54:25 +00:00
{
return raft_instance->is_leader();
}
2021-03-29 08:24:56 +00:00
bool KeeperServer::isLeaderAlive() const
{
return raft_instance->is_leader_alive();
}
2021-04-07 10:18:07 +00:00
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
if (initialized_flag)
return nuraft::cb_func::ReturnCode::Ok;
2021-03-03 15:37:31 +00:00
size_t last_commited = state_machine->last_commit_index();
size_t next_index = state_manager->getLogStore()->next_slot();
bool commited_store = false;
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;
auto set_initialized = [this] ()
{
2021-04-07 10:18:07 +00:00
std::unique_lock lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};
switch (type)
{
case nuraft::cb_func::BecomeLeader:
{
/// We become leader and store is empty or we already committed it
if (commited_store || initial_batch_committed)
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
2021-03-22 10:45:22 +00:00
case nuraft::cb_func::BecomeFollower:
2021-03-26 10:55:39 +00:00
case nuraft::cb_func::GotAppendEntryReqFromLeader:
2021-03-22 10:45:22 +00:00
{
2021-04-07 10:18:07 +00:00
if (param->leaderId != -1)
2021-03-26 10:55:39 +00:00
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
auto our_index = raft_instance->get_committed_log_idx();
/// This may happen when we start RAFT cluster from scratch.
/// Node first became leader, and after that some other node became leader.
/// BecameFresh for this node will not be called because it was already fresh
/// when it was leader.
if (leader_index < our_index + coordination_settings->fresh_log_gap)
set_initialized();
}
2021-03-22 10:45:22 +00:00
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFresh:
{
set_initialized(); /// We are fresh follower, ready to serve requests.
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::InitialBatchCommited:
{
2021-04-07 10:18:07 +00:00
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
default: /// ignore other events
return nuraft::cb_func::ReturnCode::Ok;
}
}
2021-03-29 08:24:56 +00:00
void KeeperServer::waitInit()
2021-01-27 17:54:25 +00:00
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
2021-03-26 10:55:39 +00:00
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
2021-01-27 17:54:25 +00:00
}
2021-09-02 20:37:34 +00:00
std::vector<int64_t> KeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}
2021-10-19 12:00:26 +00:00
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
return state_manager->getConfigurationDiff(config);
}
2021-10-18 15:27:51 +00:00
2021-10-19 12:00:26 +00:00
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
{
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
added = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
break;
}
2021-10-18 15:27:51 +00:00
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
if (!added)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to add server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
bool removed = false;
2021-10-19 13:11:29 +00:00
if (task.server->get_id() == state_manager->server_id())
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
raft_instance->yield_leadership();
return;
}
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
removed = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
break;
}
2021-10-18 15:27:51 +00:00
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
2021-10-19 12:00:26 +00:00
LOG_INFO(log, "Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
if (!removed)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
2021-10-18 15:27:51 +00:00
}
2021-10-19 13:11:29 +00:00
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
{
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Server with id {} was successfully added by leader", task.server->get_id());
2021-10-19 13:11:29 +00:00
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to add server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
2021-10-19 13:11:29 +00:00
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
2021-10-19 13:37:28 +00:00
LOG_INFO(log, "Server with id {} was successfully removed by leader", task.server->get_id());
2021-10-19 13:11:29 +00:00
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to remove server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
return true;
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
return true;
}
2021-01-21 20:01:25 +00:00
}