2021-02-19 16:05:26 +00:00
|
|
|
#include <Coordination/NuKeeperStateManager.h>
|
2021-02-11 09:17:57 +00:00
|
|
|
#include <Common/Exception.h>
|
2021-01-13 10:32:20 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-02-11 09:17:57 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int RAFT_ERROR;
|
|
|
|
}
|
|
|
|
|
2021-02-19 16:05:26 +00:00
|
|
|
NuKeeperStateManager::NuKeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
|
2021-02-11 12:12:01 +00:00
|
|
|
: my_server_id(server_id_)
|
|
|
|
, my_port(port)
|
2021-02-20 21:08:19 +00:00
|
|
|
, log_store(nuraft::cs_new<NuKeeperLogStore>(logs_path, 5000, false))
|
2021-02-11 12:12:01 +00:00
|
|
|
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
|
|
|
|
{
|
|
|
|
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
|
|
|
|
cluster_config->get_servers().push_back(peer_config);
|
|
|
|
}
|
|
|
|
|
2021-02-19 16:05:26 +00:00
|
|
|
NuKeeperStateManager::NuKeeperStateManager(
|
2021-02-11 09:17:57 +00:00
|
|
|
int my_server_id_,
|
|
|
|
const std::string & config_prefix,
|
2021-02-16 19:02:18 +00:00
|
|
|
const Poco::Util::AbstractConfiguration & config,
|
|
|
|
const CoordinationSettingsPtr & coordination_settings)
|
2021-01-13 10:32:20 +00:00
|
|
|
: my_server_id(my_server_id_)
|
2021-02-18 10:23:48 +00:00
|
|
|
, log_store(nuraft::cs_new<NuKeeperLogStore>(
|
2021-03-04 08:29:24 +00:00
|
|
|
config.getString(config_prefix + ".log_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/logs"),
|
2021-02-18 10:23:48 +00:00
|
|
|
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync))
|
2021-01-13 10:32:20 +00:00
|
|
|
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
|
|
|
|
{
|
2021-02-16 19:02:18 +00:00
|
|
|
|
2021-02-11 09:17:57 +00:00
|
|
|
Poco::Util::AbstractConfiguration::Keys keys;
|
2021-02-16 19:02:18 +00:00
|
|
|
config.keys(config_prefix + ".raft_configuration", keys);
|
2021-02-25 20:30:55 +00:00
|
|
|
total_servers = keys.size();
|
2021-02-11 09:17:57 +00:00
|
|
|
|
|
|
|
for (const auto & server_key : keys)
|
|
|
|
{
|
2021-02-16 19:02:18 +00:00
|
|
|
std::string full_prefix = config_prefix + ".raft_configuration." + server_key;
|
2021-02-11 09:17:57 +00:00
|
|
|
int server_id = config.getInt(full_prefix + ".id");
|
|
|
|
std::string hostname = config.getString(full_prefix + ".hostname");
|
|
|
|
int port = config.getInt(full_prefix + ".port");
|
|
|
|
bool can_become_leader = config.getBool(full_prefix + ".can_become_leader", true);
|
|
|
|
int32_t priority = config.getInt(full_prefix + ".priority", 1);
|
2021-02-11 10:25:10 +00:00
|
|
|
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
|
|
|
|
if (start_as_follower)
|
|
|
|
start_as_follower_servers.insert(server_id);
|
2021-02-11 09:17:57 +00:00
|
|
|
|
|
|
|
auto endpoint = hostname + ":" + std::to_string(port);
|
|
|
|
auto peer_config = nuraft::cs_new<nuraft::srv_config>(server_id, 0, endpoint, "", !can_become_leader, priority);
|
|
|
|
if (server_id == my_server_id)
|
|
|
|
{
|
|
|
|
my_server_config = peer_config;
|
|
|
|
my_port = port;
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster_config->get_servers().push_back(peer_config);
|
|
|
|
}
|
|
|
|
if (!my_server_config)
|
2021-02-16 19:02:18 +00:00
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
|
2021-02-11 10:25:10 +00:00
|
|
|
|
|
|
|
if (start_as_follower_servers.size() == cluster_config->get_servers().size())
|
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
|
2021-01-13 10:32:20 +00:00
|
|
|
}
|
|
|
|
|
2021-03-04 11:22:59 +00:00
|
|
|
void NuKeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep)
|
2021-02-16 19:02:18 +00:00
|
|
|
{
|
2021-03-04 11:22:59 +00:00
|
|
|
log_store->init(last_commited_index, logs_to_keep);
|
2021-02-16 19:02:18 +00:00
|
|
|
}
|
|
|
|
|
2021-02-19 16:05:26 +00:00
|
|
|
void NuKeeperStateManager::flushLogStore()
|
2021-02-17 20:36:25 +00:00
|
|
|
{
|
|
|
|
log_store->flush();
|
|
|
|
}
|
|
|
|
|
2021-02-19 16:05:26 +00:00
|
|
|
void NuKeeperStateManager::save_config(const nuraft::cluster_config & config)
|
2021-01-13 10:32:20 +00:00
|
|
|
{
|
|
|
|
// Just keep in memory in this example.
|
|
|
|
// Need to write to disk here, if want to make it durable.
|
|
|
|
nuraft::ptr<nuraft::buffer> buf = config.serialize();
|
|
|
|
cluster_config = nuraft::cluster_config::deserialize(*buf);
|
|
|
|
}
|
|
|
|
|
2021-02-19 16:05:26 +00:00
|
|
|
void NuKeeperStateManager::save_state(const nuraft::srv_state & state)
|
2021-01-13 10:32:20 +00:00
|
|
|
{
|
|
|
|
// Just keep in memory in this example.
|
|
|
|
// Need to write to disk here, if want to make it durable.
|
|
|
|
nuraft::ptr<nuraft::buffer> buf = state.serialize();
|
|
|
|
server_state = nuraft::srv_state::deserialize(*buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|