2021-03-29 08:24:56 +00:00
|
|
|
#include <Coordination/KeeperStateManager.h>
|
2021-11-19 09:30:58 +00:00
|
|
|
|
2022-04-21 13:32:06 +00:00
|
|
|
#include <filesystem>
|
2021-10-03 09:54:23 +00:00
|
|
|
#include <Coordination/Defines.h>
|
2022-04-21 13:32:06 +00:00
|
|
|
#include <Common/DNSResolver.h>
|
2021-02-11 09:17:57 +00:00
|
|
|
#include <Common/Exception.h>
|
2022-03-02 19:02:02 +00:00
|
|
|
#include <Common/isLocalAddress.h>
|
2022-07-11 12:56:09 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
2022-09-17 22:04:16 +00:00
|
|
|
#include <Common/getMultipleKeysFromConfig.h>
|
2021-01-13 10:32:20 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-02-11 09:17:57 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int RAFT_ERROR;
|
2022-07-12 06:41:58 +00:00
|
|
|
extern const int CORRUPTED_DATA;
|
2021-02-11 09:17:57 +00:00
|
|
|
}
|
|
|
|
|
2022-03-02 19:02:02 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
|
|
|
bool isLocalhost(const std::string & hostname)
|
|
|
|
{
|
2022-03-03 10:29:43 +00:00
|
|
|
try
|
|
|
|
{
|
2022-03-04 12:14:38 +00:00
|
|
|
return isLocalAddress(DNSResolver::instance().resolveHost(hostname));
|
2022-03-03 10:29:43 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2022-03-04 12:14:38 +00:00
|
|
|
|
2022-03-03 10:29:43 +00:00
|
|
|
return false;
|
2022-03-02 19:02:02 +00:00
|
|
|
}
|
|
|
|
|
2022-03-07 11:13:37 +00:00
|
|
|
std::unordered_map<UInt64, std::string> getClientPorts(const Poco::Util::AbstractConfiguration & config)
|
|
|
|
{
|
2022-04-21 12:30:35 +00:00
|
|
|
using namespace std::string_literals;
|
|
|
|
static const std::array config_port_names = {
|
|
|
|
"keeper_server.tcp_port"s,
|
|
|
|
"keeper_server.tcp_port_secure"s,
|
|
|
|
"interserver_http_port"s,
|
|
|
|
"interserver_https_port"s,
|
|
|
|
"tcp_port"s,
|
|
|
|
"tcp_with_proxy_port"s,
|
|
|
|
"tcp_port_secure"s,
|
|
|
|
"mysql_port"s,
|
|
|
|
"postgresql_port"s,
|
|
|
|
"grpc_port"s,
|
|
|
|
"prometheus.port"s,
|
2022-03-07 11:13:37 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
std::unordered_map<UInt64, std::string> ports;
|
|
|
|
for (const auto & config_port_name : config_port_names)
|
|
|
|
{
|
|
|
|
if (config.has(config_port_name))
|
|
|
|
ports[config.getUInt64(config_port_name)] = config_port_name;
|
|
|
|
}
|
|
|
|
return ports;
|
|
|
|
}
|
|
|
|
|
2022-03-02 19:02:02 +00:00
|
|
|
}
|
|
|
|
|
2022-03-02 19:37:59 +00:00
|
|
|
/// this function quite long because contains a lot of sanity checks in config:
|
|
|
|
/// 1. No duplicate endpoints
|
|
|
|
/// 2. No "localhost" or "127.0.0.1" or another local addresses mixed with normal addresses
|
2022-03-07 11:13:37 +00:00
|
|
|
/// 3. Raft internal port is not equal to any other port for client
|
2022-03-02 19:37:59 +00:00
|
|
|
/// 4. No duplicate IDs
|
|
|
|
/// 5. Our ID present in hostnames list
|
2022-04-21 13:32:06 +00:00
|
|
|
KeeperStateManager::KeeperConfigurationWrapper
|
|
|
|
KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const
|
2021-01-13 10:32:20 +00:00
|
|
|
{
|
2022-04-22 08:13:14 +00:00
|
|
|
const bool hostname_checks_enabled = config.getBool(config_prefix + ".hostname_checks_enabled", true);
|
2022-04-20 10:31:53 +00:00
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
KeeperConfigurationWrapper result;
|
2021-10-18 15:27:51 +00:00
|
|
|
result.cluster_config = std::make_shared<nuraft::cluster_config>();
|
2021-02-11 09:17:57 +00:00
|
|
|
Poco::Util::AbstractConfiguration::Keys keys;
|
2021-02-16 19:02:18 +00:00
|
|
|
config.keys(config_prefix + ".raft_configuration", keys);
|
2021-02-11 09:17:57 +00:00
|
|
|
|
2022-03-07 11:13:37 +00:00
|
|
|
auto client_ports = getClientPorts(config);
|
2022-03-02 19:37:59 +00:00
|
|
|
|
2021-12-02 11:46:33 +00:00
|
|
|
/// Sometimes (especially in cloud envs) users can provide incorrect
|
|
|
|
/// configuration with duplicated raft ids or endpoints. We check them
|
|
|
|
/// on config parsing stage and never commit to quorum.
|
|
|
|
std::unordered_map<std::string, int> check_duplicated_hostnames;
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
size_t total_servers = 0;
|
2022-04-21 13:32:06 +00:00
|
|
|
bool localhost_present = false;
|
2022-03-02 19:02:02 +00:00
|
|
|
std::string non_local_hostname;
|
2022-03-04 12:14:38 +00:00
|
|
|
size_t local_address_counter = 0;
|
2021-02-11 09:17:57 +00:00
|
|
|
for (const auto & server_key : keys)
|
|
|
|
{
|
2021-04-12 12:40:01 +00:00
|
|
|
if (!startsWith(server_key, "server"))
|
|
|
|
continue;
|
|
|
|
|
2021-02-16 19:02:18 +00:00
|
|
|
std::string full_prefix = config_prefix + ".raft_configuration." + server_key;
|
2022-09-17 22:04:16 +00:00
|
|
|
|
|
|
|
if (getMultipleValuesFromConfig(config, full_prefix, "id").size() > 1
|
|
|
|
|| getMultipleValuesFromConfig(config, full_prefix, "hostname").size() > 1
|
|
|
|
|| getMultipleValuesFromConfig(config, full_prefix, "port").size() > 1)
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "Multiple <id> or <hostname> or <port> specified for a single <server>");
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
int new_server_id = config.getInt(full_prefix + ".id");
|
2021-02-11 09:17:57 +00:00
|
|
|
std::string hostname = config.getString(full_prefix + ".hostname");
|
|
|
|
int port = config.getInt(full_prefix + ".port");
|
|
|
|
bool can_become_leader = config.getBool(full_prefix + ".can_become_leader", true);
|
|
|
|
int32_t priority = config.getInt(full_prefix + ".priority", 1);
|
2021-02-11 10:25:10 +00:00
|
|
|
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
|
2021-04-12 12:25:52 +00:00
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
if (client_ports.contains(port))
|
2022-03-02 19:37:59 +00:00
|
|
|
{
|
2022-04-21 13:32:06 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Raft configuration contains hostname '{}' with port '{}' which is equal to '{}' in server configuration",
|
|
|
|
hostname,
|
|
|
|
port,
|
|
|
|
client_ports[port]);
|
2022-03-02 19:37:59 +00:00
|
|
|
}
|
|
|
|
|
2022-04-22 08:13:14 +00:00
|
|
|
if (hostname_checks_enabled)
|
2022-03-04 12:14:38 +00:00
|
|
|
{
|
2022-04-21 13:32:06 +00:00
|
|
|
if (hostname == "localhost")
|
2022-04-20 10:31:53 +00:00
|
|
|
{
|
2022-04-21 13:32:06 +00:00
|
|
|
localhost_present = true;
|
2022-04-20 10:31:53 +00:00
|
|
|
local_address_counter++;
|
|
|
|
}
|
|
|
|
else if (isLocalhost(hostname))
|
|
|
|
{
|
|
|
|
local_address_counter++;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
non_local_hostname = hostname;
|
|
|
|
}
|
2022-03-04 12:14:38 +00:00
|
|
|
}
|
2022-03-02 19:02:02 +00:00
|
|
|
|
2021-02-11 10:25:10 +00:00
|
|
|
if (start_as_follower)
|
2021-10-18 15:27:51 +00:00
|
|
|
result.servers_start_as_followers.insert(new_server_id);
|
2021-02-11 09:17:57 +00:00
|
|
|
|
|
|
|
auto endpoint = hostname + ":" + std::to_string(port);
|
2022-04-18 10:18:43 +00:00
|
|
|
if (check_duplicated_hostnames.contains(endpoint))
|
2021-12-02 11:46:33 +00:00
|
|
|
{
|
2022-04-21 13:32:06 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Raft config contains duplicate endpoints: "
|
|
|
|
"endpoint {} has been already added with id {}, but going to add it one more time with id {}",
|
|
|
|
endpoint,
|
|
|
|
check_duplicated_hostnames[endpoint],
|
|
|
|
new_server_id);
|
2021-12-02 11:46:33 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Fullscan to check duplicated ids
|
|
|
|
for (const auto & [id_endpoint, id] : check_duplicated_hostnames)
|
|
|
|
{
|
|
|
|
if (new_server_id == id)
|
2022-04-21 13:32:06 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Raft config contains duplicate ids: id {} has been already added with endpoint {}, "
|
|
|
|
"but going to add it one more time with endpoint {}",
|
|
|
|
id,
|
|
|
|
id_endpoint,
|
|
|
|
endpoint);
|
2021-12-02 11:46:33 +00:00
|
|
|
}
|
|
|
|
check_duplicated_hostnames.emplace(endpoint, new_server_id);
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
auto peer_config = nuraft::cs_new<nuraft::srv_config>(new_server_id, 0, endpoint, "", !can_become_leader, priority);
|
|
|
|
if (my_server_id == new_server_id)
|
2021-02-11 09:17:57 +00:00
|
|
|
{
|
2021-10-18 15:27:51 +00:00
|
|
|
result.config = peer_config;
|
|
|
|
result.port = port;
|
2021-02-11 09:17:57 +00:00
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
result.cluster_config->get_servers().push_back(peer_config);
|
|
|
|
total_servers++;
|
2021-02-11 09:17:57 +00:00
|
|
|
}
|
2021-04-12 12:25:52 +00:00
|
|
|
|
2021-10-19 13:37:28 +00:00
|
|
|
if (!result.config && !allow_without_us)
|
2021-02-16 19:02:18 +00:00
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
|
2021-02-11 10:25:10 +00:00
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
if (result.servers_start_as_followers.size() == total_servers)
|
2021-02-11 10:25:10 +00:00
|
|
|
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
|
2021-10-18 15:27:51 +00:00
|
|
|
|
2022-04-22 08:13:14 +00:00
|
|
|
if (hostname_checks_enabled)
|
2022-03-04 12:14:38 +00:00
|
|
|
{
|
2022-04-21 13:32:06 +00:00
|
|
|
if (localhost_present && !non_local_hostname.empty())
|
2022-04-20 10:31:53 +00:00
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
2022-04-21 13:32:06 +00:00
|
|
|
"Mixing 'localhost' and non-local hostnames ('{}') in raft_configuration is not allowed. "
|
|
|
|
"Different hosts can resolve 'localhost' to themselves so it's not allowed.",
|
|
|
|
non_local_hostname);
|
2022-04-20 10:31:53 +00:00
|
|
|
}
|
2022-03-04 12:14:38 +00:00
|
|
|
|
2022-04-20 10:31:53 +00:00
|
|
|
if (!non_local_hostname.empty() && local_address_counter > 1)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::RAFT_ERROR,
|
|
|
|
"Local address specified more than once ({} times) and non-local hostnames also exists ('{}') in raft_configuration. "
|
|
|
|
"Such configuration is not allowed because single host can vote multiple times.",
|
2022-04-21 13:32:06 +00:00
|
|
|
local_address_counter,
|
|
|
|
non_local_hostname);
|
2022-04-20 10:31:53 +00:00
|
|
|
}
|
2022-03-02 19:02:02 +00:00
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2022-07-10 19:10:27 +00:00
|
|
|
KeeperStateManager::KeeperStateManager(
|
|
|
|
int server_id_, const std::string & host, int port, const std::string & logs_path, const std::string & state_file_path)
|
|
|
|
: my_server_id(server_id_)
|
|
|
|
, secure(false)
|
|
|
|
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
|
|
|
|
, server_state_path(state_file_path)
|
|
|
|
, logger(&Poco::Logger::get("KeeperStateManager"))
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
|
|
|
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
|
2021-10-19 12:00:26 +00:00
|
|
|
configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
|
|
|
|
configuration_wrapper.port = port;
|
|
|
|
configuration_wrapper.config = peer_config;
|
|
|
|
configuration_wrapper.cluster_config->get_servers().push_back(peer_config);
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
KeeperStateManager::KeeperStateManager(
|
2021-10-27 12:26:42 +00:00
|
|
|
int my_server_id_,
|
2021-10-18 15:27:51 +00:00
|
|
|
const std::string & config_prefix_,
|
2021-10-27 12:26:42 +00:00
|
|
|
const std::string & log_storage_path,
|
2022-07-10 09:33:49 +00:00
|
|
|
const std::string & state_file_path,
|
2021-10-18 15:27:51 +00:00
|
|
|
const Poco::Util::AbstractConfiguration & config,
|
2021-10-27 12:26:42 +00:00
|
|
|
const CoordinationSettingsPtr & coordination_settings)
|
|
|
|
: my_server_id(my_server_id_)
|
2021-10-27 15:21:26 +00:00
|
|
|
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
|
2021-10-18 15:27:51 +00:00
|
|
|
, config_prefix(config_prefix_)
|
2021-10-19 13:37:28 +00:00
|
|
|
, configuration_wrapper(parseServersConfiguration(config, false))
|
2021-10-18 15:27:51 +00:00
|
|
|
, log_store(nuraft::cs_new<KeeperLogStore>(
|
2022-04-21 13:32:06 +00:00
|
|
|
log_storage_path,
|
|
|
|
coordination_settings->rotate_log_storage_interval,
|
|
|
|
coordination_settings->force_sync,
|
|
|
|
coordination_settings->compress_logs))
|
2022-07-10 09:33:49 +00:00
|
|
|
, server_state_path(state_file_path)
|
2022-07-10 15:01:38 +00:00
|
|
|
, logger(&Poco::Logger::get("KeeperStateManager"))
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-01-13 10:32:20 +00:00
|
|
|
}
|
|
|
|
|
2021-04-08 14:17:57 +00:00
|
|
|
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
|
2021-02-16 19:02:18 +00:00
|
|
|
{
|
2021-03-04 11:22:59 +00:00
|
|
|
log_store->init(last_commited_index, logs_to_keep);
|
2021-02-16 19:02:18 +00:00
|
|
|
}
|
|
|
|
|
2022-02-19 10:00:35 +00:00
|
|
|
void KeeperStateManager::system_exit(const int /* exit_code */)
|
|
|
|
{
|
|
|
|
/// NuRaft itself calls exit() which will call atexit handlers
|
|
|
|
/// and this may lead to an issues in multi-threaded program.
|
|
|
|
///
|
|
|
|
/// Override this with abort().
|
|
|
|
abort();
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
|
|
|
|
{
|
|
|
|
auto entry_with_change = log_store->getLatestConfigChange();
|
|
|
|
if (entry_with_change)
|
|
|
|
return ClusterConfig::deserialize(entry_with_change->get_buf());
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
2022-05-02 22:45:13 +00:00
|
|
|
void KeeperStateManager::flushAndShutDownLogStore()
|
2021-02-17 20:36:25 +00:00
|
|
|
{
|
2022-05-02 22:45:13 +00:00
|
|
|
log_store->flushChangelogAndShutdown();
|
2021-02-17 20:36:25 +00:00
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
|
2021-01-13 10:32:20 +00:00
|
|
|
{
|
2021-10-19 13:11:29 +00:00
|
|
|
std::lock_guard lock(configuration_wrapper_mutex);
|
2021-01-13 10:32:20 +00:00
|
|
|
nuraft::ptr<nuraft::buffer> buf = config.serialize();
|
2021-10-19 12:00:26 +00:00
|
|
|
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);
|
2021-01-13 10:32:20 +00:00
|
|
|
}
|
|
|
|
|
2022-07-10 09:33:49 +00:00
|
|
|
const std::filesystem::path & KeeperStateManager::getOldServerStatePath()
|
|
|
|
{
|
2022-07-10 14:40:08 +00:00
|
|
|
static auto old_path = [this]
|
|
|
|
{
|
2022-07-10 09:33:49 +00:00
|
|
|
return server_state_path.parent_path() / (server_state_path.filename().generic_string() + "-OLD");
|
|
|
|
}();
|
|
|
|
|
|
|
|
return old_path;
|
|
|
|
}
|
|
|
|
|
2022-07-11 12:56:09 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
enum ServerStateVersion : uint8_t
|
|
|
|
{
|
|
|
|
V1 = 0
|
|
|
|
};
|
|
|
|
|
|
|
|
constexpr auto current_server_state_version = ServerStateVersion::V1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperStateManager::save_state(const nuraft::srv_state & state)
|
2021-01-13 10:32:20 +00:00
|
|
|
{
|
2022-07-10 09:33:49 +00:00
|
|
|
const auto & old_path = getOldServerStatePath();
|
|
|
|
|
|
|
|
if (std::filesystem::exists(server_state_path))
|
|
|
|
std::filesystem::rename(server_state_path, old_path);
|
|
|
|
|
|
|
|
WriteBufferFromFile server_state_file(server_state_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
|
|
|
|
auto buf = state.serialize();
|
2022-07-10 15:01:38 +00:00
|
|
|
|
|
|
|
// calculate checksum
|
|
|
|
SipHash hash;
|
2022-07-11 12:56:09 +00:00
|
|
|
hash.update(current_server_state_version);
|
2022-07-10 15:01:38 +00:00
|
|
|
hash.update(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
|
|
|
|
writeIntBinary(hash.get64(), server_state_file);
|
|
|
|
|
2022-07-11 12:56:09 +00:00
|
|
|
writeIntBinary(static_cast<uint8_t>(current_server_state_version), server_state_file);
|
|
|
|
|
2022-07-10 09:33:49 +00:00
|
|
|
server_state_file.write(reinterpret_cast<const char *>(buf->data_begin()), buf->size());
|
|
|
|
server_state_file.sync();
|
|
|
|
server_state_file.close();
|
|
|
|
|
|
|
|
std::filesystem::remove(old_path);
|
|
|
|
}
|
|
|
|
|
|
|
|
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
|
|
|
|
{
|
|
|
|
const auto & old_path = getOldServerStatePath();
|
|
|
|
|
2022-07-10 15:01:38 +00:00
|
|
|
const auto try_read_file = [this](const auto & path) -> nuraft::ptr<nuraft::srv_state>
|
2022-07-10 09:33:49 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-07-10 19:10:27 +00:00
|
|
|
ReadBufferFromFile read_buf(path);
|
|
|
|
auto content_size = read_buf.getFileSize();
|
|
|
|
|
|
|
|
if (content_size == 0)
|
|
|
|
return nullptr;
|
|
|
|
|
2022-07-12 06:41:58 +00:00
|
|
|
uint64_t read_checksum{0};
|
2022-07-10 15:01:38 +00:00
|
|
|
readIntBinary(read_checksum, read_buf);
|
|
|
|
|
2022-07-11 12:56:09 +00:00
|
|
|
uint8_t version;
|
|
|
|
readIntBinary(version, read_buf);
|
|
|
|
|
|
|
|
auto buffer_size = content_size - sizeof read_checksum - sizeof version;
|
|
|
|
|
2022-07-10 15:01:38 +00:00
|
|
|
auto state_buf = nuraft::buffer::alloc(buffer_size);
|
|
|
|
read_buf.read(reinterpret_cast<char *>(state_buf->data_begin()), buffer_size);
|
|
|
|
|
|
|
|
SipHash hash;
|
2022-07-11 12:56:09 +00:00
|
|
|
hash.update(version);
|
2022-07-10 15:01:38 +00:00
|
|
|
hash.update(reinterpret_cast<const char *>(state_buf->data_begin()), state_buf->size());
|
|
|
|
|
|
|
|
if (read_checksum != hash.get64())
|
|
|
|
{
|
2022-07-12 06:41:58 +00:00
|
|
|
const auto error_string = fmt::format(
|
2022-07-10 15:01:38 +00:00
|
|
|
"Invalid checksum while reading state from {}. Got {}, expected {}",
|
|
|
|
path.generic_string(),
|
|
|
|
hash.get64(),
|
|
|
|
read_checksum);
|
2022-07-12 06:41:58 +00:00
|
|
|
#ifdef NDEBUG
|
|
|
|
LOG_ERROR(logger, fmt::runtime(error_string));
|
2022-07-10 15:01:38 +00:00
|
|
|
return nullptr;
|
2022-07-12 06:41:58 +00:00
|
|
|
#else
|
|
|
|
throw Exception(ErrorCodes::CORRUPTED_DATA, error_string);
|
|
|
|
#endif
|
2022-07-10 15:01:38 +00:00
|
|
|
}
|
|
|
|
|
2022-07-10 09:33:49 +00:00
|
|
|
auto state = nuraft::srv_state::deserialize(*state_buf);
|
2022-07-10 15:01:38 +00:00
|
|
|
LOG_INFO(logger, "Read state from {}", path.generic_string());
|
2022-07-10 09:33:49 +00:00
|
|
|
return state;
|
|
|
|
}
|
2022-07-12 06:41:58 +00:00
|
|
|
catch (const std::exception & e)
|
2022-07-10 09:33:49 +00:00
|
|
|
{
|
2022-07-12 06:41:58 +00:00
|
|
|
if (const auto * exception = dynamic_cast<const Exception *>(&e);
|
|
|
|
exception != nullptr && exception->code() == ErrorCodes::CORRUPTED_DATA)
|
|
|
|
{
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
LOG_ERROR(logger, "Failed to deserialize state from {}", path.generic_string());
|
2022-07-10 09:33:49 +00:00
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if (std::filesystem::exists(server_state_path))
|
|
|
|
{
|
|
|
|
auto state = try_read_file(server_state_path);
|
|
|
|
|
|
|
|
if (state)
|
2022-07-11 12:56:09 +00:00
|
|
|
{
|
|
|
|
if (std::filesystem::exists(old_path))
|
|
|
|
std::filesystem::remove(old_path);
|
|
|
|
|
2022-07-10 09:33:49 +00:00
|
|
|
return state;
|
2022-07-11 12:56:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::filesystem::remove(server_state_path);
|
2022-07-10 09:33:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (std::filesystem::exists(old_path))
|
|
|
|
{
|
2022-07-11 12:56:09 +00:00
|
|
|
auto state = try_read_file(old_path);
|
2022-07-10 09:33:49 +00:00
|
|
|
|
|
|
|
if (state)
|
2022-07-11 12:56:09 +00:00
|
|
|
{
|
|
|
|
std::filesystem::rename(old_path, server_state_path);
|
2022-07-10 09:33:49 +00:00
|
|
|
return state;
|
2022-07-11 12:56:09 +00:00
|
|
|
}
|
2022-07-10 09:33:49 +00:00
|
|
|
|
2022-07-11 12:56:09 +00:00
|
|
|
std::filesystem::remove(old_path);
|
2022-07-10 09:33:49 +00:00
|
|
|
}
|
|
|
|
|
2022-07-11 12:56:09 +00:00
|
|
|
LOG_WARNING(logger, "No state was read");
|
2022-07-10 09:33:49 +00:00
|
|
|
return nullptr;
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
|
|
|
|
{
|
2021-10-19 13:37:28 +00:00
|
|
|
auto new_configuration_wrapper = parseServersConfiguration(config, true);
|
2021-10-18 15:27:51 +00:00
|
|
|
|
|
|
|
std::unordered_map<int, KeeperServerConfigPtr> new_ids, old_ids;
|
2021-10-19 19:42:40 +00:00
|
|
|
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())
|
2021-10-18 15:27:51 +00:00
|
|
|
new_ids[new_server->get_id()] = new_server;
|
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(configuration_wrapper_mutex);
|
2021-10-19 19:42:40 +00:00
|
|
|
for (const auto & old_server : configuration_wrapper.cluster_config->get_servers())
|
2021-10-19 13:11:29 +00:00
|
|
|
old_ids[old_server->get_id()] = old_server;
|
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
|
|
|
|
ConfigUpdateActions result;
|
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
/// First of all add new servers
|
2022-04-28 08:54:05 +00:00
|
|
|
for (const auto & [new_id, server_config] : new_ids)
|
2021-10-19 07:14:53 +00:00
|
|
|
{
|
2022-04-28 08:54:05 +00:00
|
|
|
auto old_server_it = old_ids.find(new_id);
|
|
|
|
if (old_server_it == old_ids.end())
|
2021-10-19 07:14:53 +00:00
|
|
|
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::AddServer, server_config});
|
2022-04-28 08:54:05 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
const auto & old_endpoint = old_server_it->second->get_endpoint();
|
|
|
|
if (old_endpoint != server_config->get_endpoint())
|
|
|
|
{
|
|
|
|
LOG_WARNING(
|
|
|
|
&Poco::Logger::get("RaftConfiguration"),
|
|
|
|
"Config will be ignored because a server with ID {} is already present in the cluster on a different endpoint ({}). "
|
|
|
|
"The endpoint of the current servers should not be changed. For servers on a new endpoint, please use a new ID.",
|
|
|
|
new_id,
|
|
|
|
old_endpoint);
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
}
|
2021-10-19 07:14:53 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
/// After that remove old ones
|
2021-10-19 12:00:26 +00:00
|
|
|
for (auto [old_id, server_config] : old_ids)
|
|
|
|
{
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!new_ids.contains(old_id))
|
2021-10-19 12:00:26 +00:00
|
|
|
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
|
|
|
|
}
|
|
|
|
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 13:11:29 +00:00
|
|
|
std::lock_guard lock(configuration_wrapper_mutex);
|
|
|
|
/// And update priority if required
|
|
|
|
for (const auto & old_server : configuration_wrapper.cluster_config->get_servers())
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 13:11:29 +00:00
|
|
|
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 13:11:29 +00:00
|
|
|
if (old_server->get_id() == new_server->get_id())
|
2021-10-19 12:00:26 +00:00
|
|
|
{
|
2021-10-19 13:11:29 +00:00
|
|
|
if (old_server->get_priority() != new_server->get_priority())
|
|
|
|
{
|
|
|
|
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::UpdatePriority, new_server});
|
|
|
|
}
|
|
|
|
break;
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return result;
|
|
|
|
}
|
2021-01-13 10:32:20 +00:00
|
|
|
|
|
|
|
}
|