Better non-dirty fix

This commit is contained in:
alesapin 2021-04-07 13:18:07 +03:00
parent 05eeec16c1
commit 36c0e601a9
3 changed files with 89 additions and 23 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 241fd3754a1eb4d82ab68a9a875dc99391ec9f02
Subproject commit c35819f2c8a378d4ba88cc930c17bc20aeb875eb

View File

@ -32,9 +32,10 @@ KeeperServer::KeeperServer(
coordination_settings))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
, log(&Poco::Logger::get("KeeperServer"))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Quorum reads enabled, Keeper will work slower.");
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
}
void KeeperServer::startup()
@ -74,32 +75,87 @@ void KeeperServer::startup()
nuraft::raft_server::init_options init_options;
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
init_options.start_server_in_constructor_ = false;
init_options.raft_callback_ = [this] (nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
return callbackFunc(type, param);
};
{
/// We use this lock here because NuRaft start background threads in
/// raft_server constructor. These threads may call raft_callback
/// (callbackFunc) before raft_instance object fully constructed.
std::lock_guard lock(initialized_mutex);
raft_instance = launcher.init(
state_machine, state_manager, nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level), state_manager->getPort(),
asio_opts, params, init_options);
}
launchRaftServer(params, asio_opts, init_options);
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}
void KeeperServer::launchRaftServer(
const nuraft::raft_params & params,
const nuraft::asio_service::options & asio_opts,
const nuraft::raft_server::init_options & init_opts)
{
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger);
if (!asio_listener)
return;
nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
nuraft::ptr<nuraft::state_mgr> casted_state_manager = state_manager;
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
/// raft_server creates unique_ptr from it
nuraft::context * ctx = new nuraft::context(
casted_state_manager, casted_state_machine,
asio_listener, logger, rpc_cli_factory, scheduler, params);
raft_instance = nuraft::cs_new<nuraft::raft_server>(ctx, init_opts);
raft_instance->start_server(init_opts.skip_initial_election_timeout_);
asio_listener->listen(raft_instance);
}
void KeeperServer::shutdownRaftServer()
{
size_t timeout = coordination_settings->shutdown_timeout.totalSeconds();
if (!raft_instance)
{
LOG_INFO(log, "RAFT doesn't start, shutdown not required");
return;
}
raft_instance->shutdown();
raft_instance.reset();
if (asio_listener)
{
asio_listener->stop();
asio_listener->shutdown();
}
if (asio_service)
{
asio_service->stop();
size_t count = 0;
while (asio_service->get_active_workers() != 0 && count < timeout * 100)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
count++;
}
}
if (asio_service->get_active_workers() != 0)
LOG_WARNING(log, "Failed to shutdown RAFT server in {} seconds", timeout);
}
void KeeperServer::shutdown()
{
state_machine->shutdownStorage();
state_manager->flushLogStore();
auto timeout = coordination_settings->shutdown_timeout.totalSeconds();
if (!launcher.shutdown(timeout))
LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Failed to shutdown RAFT server in {} seconds", timeout);
shutdownRaftServer();
}
namespace
@ -196,7 +252,7 @@ bool KeeperServer::isLeaderAlive() const
return raft_instance->is_leader_alive();
}
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */)
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
if (initialized_flag)
return nuraft::cb_func::ReturnCode::Ok;
@ -207,12 +263,9 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;
/// We use this lock here because NuRaft starts background threads in
/// raft_server constructor. So this callback can be called before
/// raft_instance object fully initialized. This lock allows to avoid this.
std::unique_lock lock(initialized_mutex);
auto set_initialized = [this] ()
{
std::unique_lock lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};
@ -229,7 +282,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
case nuraft::cb_func::BecomeFollower:
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
if (isLeaderAlive())
if (param->leaderId != -1)
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
auto our_index = raft_instance->get_committed_log_idx();
@ -249,7 +302,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
case nuraft::cb_func::InitialBatchCommited:
{
if (isLeader()) /// We have committed our log store and we are leader, ready to serve requests.
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;

View File

@ -7,6 +7,7 @@
#include <Coordination/KeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
#include <unordered_map>
#include <common/logger_useful.h>
namespace DB
{
@ -22,9 +23,9 @@ private:
nuraft::ptr<KeeperStateManager> state_manager;
nuraft::raft_launcher launcher;
nuraft::ptr<nuraft::raft_server> raft_instance;
nuraft::ptr<nuraft::asio_service> asio_service;
nuraft::ptr<nuraft::rpc_listener> asio_listener;
std::mutex append_entries_mutex;
@ -36,8 +37,20 @@ private:
std::atomic<bool> initial_batch_committed = false;
std::atomic<size_t> active_session_id_requests = 0;
Poco::Logger * log;
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);
/// Almost copy-paste from nuraft::launcher, but with separated server init and start
/// Allows to avoid race conditions.
void launchRaftServer(
const nuraft::raft_params & params,
const nuraft::asio_service::options & asio_opts,
const nuraft::raft_server::init_options & init_opts);
void shutdownRaftServer();
public:
KeeperServer(
int server_id_,