At least able to start

This commit is contained in:
alesapin 2021-10-18 18:27:51 +03:00
parent 452eee7922
commit bfe2a937eb
19 changed files with 393 additions and 86 deletions

View File

@ -863,6 +863,9 @@ if (ThreadFuzzer::instance().isEffective())
if (config->has("max_concurrent_queries"))
global_context->getProcessList().setMaxSize(config->getInt("max_concurrent_queries", 0));
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
if (!initial_loading)
{
/// We do not load ZooKeeper configuration on the first config loading

View File

@ -631,6 +631,14 @@ LogEntryPtr Changelog::entryAt(uint64_t index)
return src;
}
LogEntryPtr Changelog::getLatestConfigChange() const
{
for (const auto & [_, entry] : logs)
if (entry->get_val_type() == nuraft::conf)
return entry;
return nullptr;
}
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;

View File

@ -102,6 +102,8 @@ public:
/// Last entry in log, or fake entry with term 0 if log is empty
LogEntryPtr getLastEntry() const;
LogEntryPtr getLatestConfigChange() const;
/// Return log entries between [start, end)
LogEntriesPtr getLogEntriesBetween(uint64_t start_index, uint64_t end_index);

View File

@ -273,6 +273,8 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
updateConfiguration(config);
}
catch (...)
{
@ -497,4 +499,17 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
return future.get();
}
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
if (isLeader())
{
server->updateConfiguration(config);
}
else
{
LOG_INFO(log, "Configuration changed, but we are not leader, so we will wait update from leader");
}
}
}

View File

@ -62,7 +62,6 @@ private:
ThreadFromGlobalPool session_cleaner_thread;
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
@ -102,6 +101,8 @@ public:
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper);
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();

View File

@ -112,4 +112,10 @@ void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*co
changelog.flush();
}
nuraft::ptr<nuraft::log_entry> KeeperLogStore::getLatestConfigChange() const
{
std::lock_guard lock(changelog_lock);
return changelog.getLatestConfigChange();
}
}

View File

@ -58,6 +58,8 @@ public:
/// Flush batch of appended entries
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
nuraft::ptr<nuraft::log_entry> getLatestConfigChange() const;
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;

View File

@ -121,6 +121,22 @@ void KeeperServer::startup()
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
state_manager->setClusterConfig(latest_snapshot_config);
else
state_manager->setClusterConfig(latest_log_store_config);
}
else if (latest_snapshot_config)
state_manager->setClusterConfig(latest_snapshot_config);
else if (latest_log_store_config)
state_manager->setClusterConfig(latest_log_store_config);
/// else use parsed config in state_manager constructor (first start)
bool single_server = state_manager->getTotalServers() == 1;
nuraft::raft_params params;
@ -363,4 +379,39 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
void KeeperServer::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto diff = state_manager->getConfigurationDiff(config);
if (diff.empty())
{
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
return;
}
else if (diff.size() > 1)
{
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
}
for (auto & task : diff)
{
if (task.action_type == ConfigUpdateActionType::AddServer)
{
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to add server (id {}) was not accepted by RAFT", task.server->get_id());
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to remove server (id {}) was not accepted by RAFT", task.server->get_id());
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
}
}
}

View File

@ -83,6 +83,8 @@ public:
void shutdown();
int getServerID() const { return server_id; }
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
};
}

View File

@ -205,9 +205,16 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
writeBinary(id, out);
}
}
if (snapshot.cluster_config)
{
auto buffer = snapshot.cluster_config->serialize();
writeVarUInt(buffer->size(), out);
out.write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size());
}
}
SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, ReadBuffer & in)
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in)
{
uint8_t version;
readBinary(version, in);
@ -215,11 +222,13 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
if (current_version > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);
deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in);
KeeperStorage & storage = *deserialization_result.storage;
int64_t session_id;
readBinary(session_id, in);
storage.zxid = result->get_last_log_idx();
storage.zxid = deserialization_result.snapshot_meta->get_last_log_idx();
storage.session_id_counter = session_id;
/// Before V1 we serialized ACL without acl_map
@ -309,13 +318,24 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
current_session_size++;
}
return result;
/// Optional cluster config
ClusterConfigPtr cluster_config = nullptr;
if (!in.eof())
{
size_t data_size;
readVarUInt(data_size, in);
auto buffer = nuraft::buffer::alloc(data_size);
in.readStrict(reinterpret_cast<char *>(buffer->data_begin()), data_size);
buffer->pos(0);
deserialization_result.cluster_config = ClusterConfig::deserialize(*buffer);
}
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_)
: storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
, cluster_config(cluster_config_)
{
storage->enableSnapshotMode();
snapshot_container_size = storage->container.snapshotSize();
@ -325,10 +345,11 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t
session_and_auth = storage->session_and_auth;
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_)
: storage(storage_)
, snapshot_meta(snapshot_meta_)
, session_id(storage->session_id_counter)
, cluster_config(cluster_config_)
{
storage->enableSnapshotMode();
snapshot_container_size = storage->container.snapshotSize();
@ -461,7 +482,7 @@ bool KeeperSnapshotManager::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer)
return magic_from_buffer == ZSTD_COMPRESSED_MAGIC;
}
SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
{
bool is_zstd_compressed = isZstdCompressed(buffer);
@ -473,12 +494,13 @@ SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nura
else
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);
auto storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, *compressed_reader);
return std::make_pair(snapshot_metadata, std::move(storage));
SnapshotDeserializationResult result;
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
KeeperStorageSnapshot::deserialize(result, *compressed_reader);
return result;
}
SnapshotMetaAndStorage KeeperSnapshotManager::restoreFromLatestSnapshot()
SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot()
{
if (existing_snapshots.empty())
return {};
@ -502,7 +524,6 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
std::filesystem::remove(itr->second);
existing_snapshots.erase(itr);
}

View File

@ -9,6 +9,8 @@ namespace DB
using SnapshotMetadata = nuraft::snapshot;
using SnapshotMetadataPtr = std::shared_ptr<SnapshotMetadata>;
using ClusterConfig = nuraft::cluster_config;
using ClusterConfigPtr = nuraft::ptr<ClusterConfig>;
enum SnapshotVersion : uint8_t
{
@ -20,6 +22,13 @@ enum SnapshotVersion : uint8_t
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V3;
struct SnapshotDeserializationResult
{
KeeperStoragePtr storage;
SnapshotMetadataPtr snapshot_meta;
ClusterConfigPtr cluster_config;
};
/// In memory keeper snapshot. Keeper Storage based on a hash map which can be
/// turned into snapshot mode. This operation is fast and KeeperStorageSnapshot
/// class do it in constructor. It also copies iterators from storage hash table
@ -31,14 +40,15 @@ static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V3;
struct KeeperStorageSnapshot
{
public:
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_ = nullptr);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_ = nullptr);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~KeeperStorageSnapshot();
static void serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out);
static SnapshotMetadataPtr deserialize(KeeperStorage & storage, ReadBuffer & in);
static void deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in);
KeeperStorage * storage;
@ -58,6 +68,8 @@ public:
KeeperStorage::SessionAndAuth session_and_auth;
/// ACLs cache for better performance. Without we cannot deserialize storage.
std::unordered_map<uint64_t, Coordination::ACLs> acl_map;
/// Cluster config from snapshot, can be empty
ClusterConfigPtr cluster_config;
};
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
@ -76,7 +88,7 @@ public:
bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500);
/// Restore storage from latest available snapshot
SnapshotMetaAndStorage restoreFromLatestSnapshot();
SnapshotDeserializationResult restoreFromLatestSnapshot();
/// Compress snapshot and serialize it to buffer
nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const;
@ -84,7 +96,7 @@ public:
/// Serialize already compressed snapshot to disk (return path)
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
/// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer.
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const;

View File

@ -74,7 +74,10 @@ void KeeperStateMachine::init()
try
{
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
loaded = true;
break;
@ -152,13 +155,24 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
}
last_committed_idx = s.get_last_log_idx();
return true;
}
void KeeperStateMachine::commit_config(const ulong /*log_idx*/, nuraft::ptr<nuraft::cluster_config> & new_conf)
{
std::lock_guard lock(cluster_config_lock);
auto tmp = new_conf->serialize();
cluster_config = ClusterConfig::deserialize(*tmp);
}
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()
{
/// Just return the latest snapshot.
@ -177,7 +191,7 @@ void KeeperStateMachine::create_snapshot(
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
/// create snapshot task for background execution (in snapshot thread)
@ -239,7 +253,7 @@ void KeeperStateMachine::save_logical_snp_obj(
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx(), getClusterConfig());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
@ -324,4 +338,16 @@ void KeeperStateMachine::shutdownStorage()
storage->finalize();
}
ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
{
std::lock_guard lock(cluster_config_lock);
if (cluster_config)
{
/// dumb way to return copy...
auto tmp = cluster_config->serialize();
return ClusterConfig::deserialize(*tmp);
}
return nullptr;
}
}

View File

@ -32,6 +32,8 @@ public:
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override;
/// Currently not supported
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
@ -76,6 +78,8 @@ public:
void shutdownStorage();
ClusterConfigPtr getClusterConfig() const;
private:
/// In our state machine we always have a single snapshot which is stored
@ -109,8 +113,12 @@ private:
/// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
mutable std::mutex cluster_config_lock;
ClusterConfigPtr cluster_config;
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
};

View File

@ -31,42 +31,22 @@ namespace
}
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, my_port(port)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
cluster_config->get_servers().push_back(peer_config);
}
KeeperStateManager::KeeperStateManager(
int my_server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper)
: my_server_id(my_server_id_)
, secure(config.getBool(config_prefix + ".raft_configuration.secure", false))
, log_store(nuraft::cs_new<KeeperLogStore>(
getLogsPathFromConfig(config_prefix, config, standalone_keeper),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync, coordination_settings->compress_logs))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
KeeperServersConfiguration KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config) const
{
KeeperServersConfiguration result;
result.cluster_config = std::make_shared<nuraft::cluster_config>();
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix + ".raft_configuration", keys);
total_servers = keys.size();
size_t total_servers = 0;
for (const auto & server_key : keys)
{
if (!startsWith(server_key, "server"))
continue;
std::string full_prefix = config_prefix + ".raft_configuration." + server_key;
int server_id = config.getInt(full_prefix + ".id");
int new_server_id = config.getInt(full_prefix + ".id");
std::string hostname = config.getString(full_prefix + ".hostname");
int port = config.getInt(full_prefix + ".port");
bool can_become_leader = config.getBool(full_prefix + ".can_become_leader", true);
@ -74,24 +54,57 @@ KeeperStateManager::KeeperStateManager(
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
if (start_as_follower)
start_as_follower_servers.insert(server_id);
result.servers_start_as_followers.insert(new_server_id);
auto endpoint = hostname + ":" + std::to_string(port);
auto peer_config = nuraft::cs_new<nuraft::srv_config>(server_id, 0, endpoint, "", !can_become_leader, priority);
if (server_id == my_server_id)
auto peer_config = nuraft::cs_new<nuraft::srv_config>(new_server_id, 0, endpoint, "", !can_become_leader, priority);
if (my_server_id == new_server_id)
{
my_server_config = peer_config;
my_port = port;
result.config = peer_config;
result.port = port;
}
cluster_config->get_servers().push_back(peer_config);
result.cluster_config->get_servers().push_back(peer_config);
total_servers++;
}
if (!my_server_config)
if (!result.config)
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
if (start_as_follower_servers.size() == cluster_config->get_servers().size())
if (result.servers_start_as_followers.size() == total_servers)
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
return result;
}
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
, log(&Poco::Logger::get("KeeperStateManager"))
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
servers_configuration.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
servers_configuration.port = port;
servers_configuration.config = peer_config;
servers_configuration.cluster_config->get_servers().push_back(peer_config);
}
KeeperStateManager::KeeperStateManager(
int server_id_,
const std::string & config_prefix_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper)
: my_server_id(server_id_)
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
, config_prefix(config_prefix_)
, servers_configuration(parseServersConfiguration(config))
, log_store(nuraft::cs_new<KeeperLogStore>(
getLogsPathFromConfig(config_prefix_, config, standalone_keeper),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync, coordination_settings->compress_logs))
, log(&Poco::Logger::get("KeeperStateManager"))
{
}
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
@ -99,6 +112,19 @@ void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t log
log_store->init(last_commited_index, logs_to_keep);
}
ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
{
auto entry_with_change = log_store->getLatestConfigChange();
if (entry_with_change)
return ClusterConfig::deserialize(entry_with_change->get_buf());
return nullptr;
}
void KeeperStateManager::setClusterConfig(const ClusterConfigPtr & cluster_config)
{
servers_configuration.cluster_config = cluster_config;
}
void KeeperStateManager::flushLogStore()
{
log_store->flush();
@ -106,18 +132,57 @@ void KeeperStateManager::flushLogStore()
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
nuraft::ptr<nuraft::buffer> buf = config.serialize();
cluster_config = nuraft::cluster_config::deserialize(*buf);
servers_configuration.cluster_config = nuraft::cluster_config::deserialize(*buf);
}
void KeeperStateManager::save_state(const nuraft::srv_state & state)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
nuraft::ptr<nuraft::buffer> buf = state.serialize();
server_state = nuraft::srv_state::deserialize(*buf);
}
}
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
{
auto new_servers_configuration = parseServersConfiguration(config);
if (new_servers_configuration.port != servers_configuration.port)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot change port of already running RAFT server");
std::unordered_map<int, KeeperServerConfigPtr> new_ids, old_ids;
for (auto new_server : new_servers_configuration.cluster_config->get_servers())
new_ids[new_server->get_id()] = new_server;
for (auto old_server : servers_configuration.cluster_config->get_servers())
old_ids[old_server->get_id()] = old_server;
std::unordered_map<int, KeeperServerConfigPtr> servers_to_remove, servers_to_add;
auto comp = [] (auto & a, auto & b) { return a.first < b.first; };
std::set_difference(old_ids.begin(), old_ids.end(), new_ids.begin(), new_ids.end(), std::inserter(servers_to_remove, servers_to_remove.begin()), comp);
std::set_difference(new_ids.begin(), new_ids.end(), old_ids.begin(), old_ids.end(), std::inserter(servers_to_add, servers_to_add.begin()), comp);
ConfigUpdateActions result;
for (auto & [_, server_config] : servers_to_remove)
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
for (auto & [_, server_config] : servers_to_add)
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::AddServer, server_config});
for (const auto & old_server : servers_configuration.cluster_config->get_servers())
{
for (const auto & new_server : new_servers_configuration.cluster_config->get_servers())
{
if (old_server->get_id() == new_server->get_id())
{
if (old_server->get_priority() != new_server->get_priority())
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::UpdatePriority, new_server});
break;
}
}
}
return result;
}
}

View File

@ -6,16 +6,42 @@
#include <Coordination/CoordinationSettings.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperSnapshotManager.h>
namespace DB
{
using KeeperServerConfigPtr = nuraft::ptr<nuraft::srv_config>;
struct KeeperServersConfiguration
{
int port;
KeeperServerConfigPtr config;
std::unordered_set<int> servers_start_as_followers;
ClusterConfigPtr cluster_config;
};
enum class ConfigUpdateActionType
{
RemoveServer,
AddServer,
UpdatePriority,
};
struct ConfigUpdateAction
{
ConfigUpdateActionType action_type;
KeeperServerConfigPtr server;
};
using ConfigUpdateActions = std::vector<ConfigUpdateAction>;
class KeeperStateManager : public nuraft::state_mgr
{
public:
KeeperStateManager(
int server_id_,
const std::string & config_prefix,
const std::string & config_prefix_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper);
@ -30,7 +56,7 @@ public:
void flushLogStore();
nuraft::ptr<nuraft::cluster_config> load_config() override { return cluster_config; }
nuraft::ptr<nuraft::cluster_config> load_config() override { return servers_configuration.cluster_config; }
void save_config(const nuraft::cluster_config & config) override;
@ -40,17 +66,17 @@ public:
nuraft::ptr<nuraft::log_store> load_log_store() override { return log_store; }
Int32 server_id() override { return my_server_id; }
int32_t server_id() override { return my_server_id; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return my_server_config; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return servers_configuration.config; }
void system_exit(const int /* exit_code */) override {}
int getPort() const { return my_port; }
int getPort() const { return servers_configuration.port; }
bool shouldStartAsFollower() const
{
return start_as_follower_servers.count(my_server_id);
return servers_configuration.servers_start_as_followers.count(my_server_id);
}
bool isSecure() const
@ -60,18 +86,25 @@ public:
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
uint64_t getTotalServers() const { return total_servers; }
uint64_t getTotalServers() const { return servers_configuration.cluster_config->get_servers().size(); }
ClusterConfigPtr getLatestConfigFromLogStore() const;
void setClusterConfig(const ClusterConfigPtr & cluster_config);
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
private:
int my_server_id;
int my_port;
bool secure;
uint64_t total_servers{0};
std::unordered_set<int> start_as_follower_servers;
std::string config_prefix;
KeeperServersConfiguration servers_configuration;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;
nuraft::ptr<nuraft::cluster_config> cluster_config;
nuraft::ptr<nuraft::srv_state> server_state;
Poco::Logger * log;
KeeperServersConfiguration parseServersConfiguration(const Poco::Util::AbstractConfiguration & config) const;
};
}

View File

@ -0,0 +1,2 @@
add_executable (request_fuzzer request_fuzzer.cpp)
target_link_libraries (request_fuzzer PRIVATE dbms ${LIB_FUZZING_ENGINE})

View File

@ -0,0 +1,36 @@
#include <iostream>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperLogStore.h>
#include <memory>
using namespace DB;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
try {
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
nuraft::ptr<nuraft::buffer> ret = nuraft::buffer::alloc(sizeof(size));
nuraft::buffer_serializer bs(ret);
bs.put_raw(data, size);
state_machine->commit(1, *ret);
} catch (...) {
return 0;
}
return 0;
}
catch (...)
{
return 1;
}

View File

@ -161,8 +161,8 @@ struct ContextSharedPart
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
#if USE_NURAFT
mutable std::mutex keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_storage_dispatcher;
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1836,16 +1836,16 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
void Context::initializeKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
if (shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
shared->keeper_storage_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_storage_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
shared->keeper_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
}
#endif
}
@ -1853,27 +1853,39 @@ void Context::initializeKeeperDispatcher() const
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & Context::getKeeperDispatcher() const
{
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (!shared->keeper_storage_dispatcher)
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests");
return shared->keeper_storage_dispatcher;
return shared->keeper_dispatcher;
}
#endif
void Context::shutdownKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_dispatcher)
{
shared->keeper_storage_dispatcher->shutdown();
shared->keeper_storage_dispatcher.reset();
shared->keeper_dispatcher->shutdown();
shared->keeper_dispatcher.reset();
}
#endif
}
void Context::updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config)
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
return;
shared->keeper_dispatcher->updateConfiguration(config);
#endif
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);

View File

@ -671,6 +671,7 @@ public:
#endif
void initializeKeeperDispatcher() const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);
@ -792,6 +793,7 @@ public:
DisksMap getDisksMap() const;
void updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Provides storage politics schemes
StoragePolicyPtr getStoragePolicy(const String & name) const;