ClickHouse/src/Coordination/NuKeeperServer.cpp

224 lines
7.8 KiB
C++
Raw Normal View History

2021-01-21 20:01:25 +00:00
#include <Coordination/NuKeeperServer.h>
2021-01-22 16:04:57 +00:00
#include <Coordination/LoggerWrapper.h>
#include <Coordination/NuKeeperStateMachine.h>
2021-02-19 16:05:26 +00:00
#include <Coordination/NuKeeperStateManager.h>
2021-01-22 16:04:57 +00:00
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <chrono>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <string>
2021-01-21 20:01:25 +00:00
namespace DB
{
2021-01-25 12:29:12 +00:00
namespace ErrorCodes
{
extern const int RAFT_ERROR;
}
2021-01-22 16:04:57 +00:00
2021-02-09 14:47:18 +00:00
NuKeeperServer::NuKeeperServer(
2021-02-11 09:17:57 +00:00
int server_id_,
2021-02-09 14:47:18 +00:00
const CoordinationSettingsPtr & coordination_settings_,
2021-02-11 09:17:57 +00:00
const Poco::Util::AbstractConfiguration & config,
2021-02-09 14:47:18 +00:00
ResponsesQueue & responses_queue_)
2021-01-22 16:04:57 +00:00
: server_id(server_id_)
2021-02-09 14:47:18 +00:00
, coordination_settings(coordination_settings_)
2021-03-01 13:33:34 +00:00
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, config.getString("test_keeper_server.snapshot_storage_path"), coordination_settings))
2021-02-19 16:05:26 +00:00
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
2021-01-22 16:04:57 +00:00
{
}
2021-02-11 10:25:10 +00:00
void NuKeeperServer::startup()
2021-01-22 16:04:57 +00:00
{
2021-02-16 19:02:18 +00:00
state_manager->loadLogStore(state_machine->last_commit_index());
bool single_server = state_manager->getTotalServers() == 1;
2021-02-16 19:02:18 +00:00
2021-01-22 16:04:57 +00:00
nuraft::raft_params params;
if (single_server)
{
2021-02-26 06:59:40 +00:00
/// Don't make sense in single server mode
params.heart_beat_interval_ = 0;
params.election_timeout_lower_bound_ = 0;
params.election_timeout_upper_bound_ = 0;
}
else
{
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
}
2021-02-09 14:47:18 +00:00
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds();
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
2021-01-22 16:04:57 +00:00
params.return_method_ = nuraft::raft_params::blocking;
nuraft::asio_service::options asio_opts{};
nuraft::raft_server::init_options init_options;
2021-02-11 10:25:10 +00:00
init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
init_options.raft_callback_ = [this] (nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
return callbackFunc(type, param);
};
2021-01-22 16:04:57 +00:00
raft_instance = launcher.init(
2021-02-11 09:17:57 +00:00
state_machine, state_manager, nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level), state_manager->getPort(),
asio_opts, params, init_options);
2021-01-22 16:04:57 +00:00
if (!raft_instance)
2021-01-25 12:29:12 +00:00
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
2021-01-22 16:04:57 +00:00
}
void NuKeeperServer::shutdown()
2021-01-22 16:04:57 +00:00
{
state_machine->shutdownStorage();
2021-02-17 20:36:25 +00:00
state_manager->flushLogStore();
2021-02-09 14:47:18 +00:00
if (!launcher.shutdown(coordination_settings->shutdown_timeout.totalSeconds()))
2021-01-25 12:29:12 +00:00
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5);
2021-01-22 16:04:57 +00:00
}
namespace
{
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
return buf.getBuffer();
}
}
void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & request_for_session)
2021-01-22 16:04:57 +00:00
{
auto [session_id, request] = request_for_session;
if (isLeaderAlive() && request->isReadRequest())
2021-01-22 16:04:57 +00:00
{
state_machine->processReadRequest(request_for_session);
2021-01-22 16:04:57 +00:00
}
2021-01-27 13:37:58 +00:00
else
2021-01-26 07:47:04 +00:00
{
2021-01-27 13:37:58 +00:00
std::vector<nuraft::ptr<nuraft::buffer>> entries;
entries.push_back(getZooKeeperLogEntry(session_id, request));
2021-01-22 16:04:57 +00:00
std::lock_guard lock(append_entries_mutex);
2021-01-27 13:37:58 +00:00
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
2021-01-27 17:54:25 +00:00
{
2021-02-01 14:14:59 +00:00
NuKeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response});
2021-01-27 17:54:25 +00:00
}
2021-01-27 13:37:58 +00:00
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
2021-02-01 14:14:59 +00:00
NuKeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response});
2021-01-27 13:37:58 +00:00
}
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str());
}
2021-01-22 16:04:57 +00:00
}
2021-02-04 08:28:11 +00:00
int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms)
2021-01-22 16:04:57 +00:00
{
2021-02-04 08:28:11 +00:00
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
2021-01-25 12:29:12 +00:00
/// Just special session request
2021-01-22 16:04:57 +00:00
nuraft::buffer_serializer bs(entry);
bs.put_i64(session_timeout_ms);
2021-01-22 16:04:57 +00:00
std::lock_guard lock(append_entries_mutex);
2021-01-22 16:04:57 +00:00
auto result = raft_instance->append_entries({entry});
2021-02-04 19:29:46 +00:00
2021-01-22 16:04:57 +00:00
if (!result->get_accepted())
2021-01-25 12:29:12 +00:00
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT");
2021-01-22 16:04:57 +00:00
if (result->get_result_code() != nuraft::cmd_result_code::OK)
2021-01-25 12:29:12 +00:00
throw Exception(ErrorCodes::RAFT_ERROR, "session_id request failed to RAFT");
2021-01-22 16:04:57 +00:00
auto resp = result->get();
if (resp == nullptr)
throw Exception(ErrorCodes::RAFT_ERROR, "Received nullptr as session_id");
2021-01-22 16:04:57 +00:00
nuraft::buffer_serializer bs_resp(resp);
return bs_resp.get_i64();
}
2021-01-21 20:01:25 +00:00
2021-01-27 17:54:25 +00:00
bool NuKeeperServer::isLeader() const
{
return raft_instance->is_leader();
}
bool NuKeeperServer::isLeaderAlive() const
{
return raft_instance->is_leader_alive();
}
nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */)
{
/// Only initial record
bool empty_store = state_manager->getLogStore()->size() == 1;
auto set_initialized = [this] ()
{
std::unique_lock lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};
switch (type)
{
case nuraft::cb_func::BecomeLeader:
{
if (empty_store) /// We become leader and store is empty, ready to serve requests
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFresh:
{
set_initialized(); /// We are fresh follower, ready to serve requests.
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::InitialBatchCommited:
{
2021-02-26 06:59:40 +00:00
if (isLeader()) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
default: /// ignore other events
return nuraft::cb_func::ReturnCode::Ok;
}
}
void NuKeeperServer::waitInit()
2021-01-27 17:54:25 +00:00
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag; }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
2021-01-27 17:54:25 +00:00
}
std::unordered_set<int64_t> NuKeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}
2021-01-21 20:01:25 +00:00
}