Merge pull request #30372 from ClickHouse/update_keeper_config

Updatable keeper configuration.
This commit is contained in:
alesapin 2021-10-25 10:37:03 +03:00 committed by GitHub
commit 490ca93162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1447 additions and 105 deletions

View File

@ -5,6 +5,7 @@
#include <Common/DNSResolver.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Coordination/Defines.h>
#include <Common/Config/ConfigReloader.h>
#include <filesystem>
#include <IO/UseSSL.h>
#include <Core/ServerUUID.h>
@ -331,6 +332,8 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
const Settings & settings = global_context->getSettingsRef();
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 100));
static ServerErrorHandler error_handler;
@ -402,8 +405,27 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
for (auto & server : *servers)
server.start();
zkutil::EventPtr unused_event = std::make_shared<Poco::Event>();
zkutil::ZooKeeperNodeCache unused_cache([] { return nullptr; });
/// ConfigReloader have to strict parameters which are redundant in our case
auto main_config_reloader = std::make_unique<ConfigReloader>(
config_path,
include_from_path,
config().getString("path", ""),
std::move(unused_cache),
unused_event,
[&](ConfigurationPtr config, bool /* initial_loading */)
{
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
},
/* already_loaded = */ false); /// Reload it right now (initial loading)
SCOPE_EXIT({
LOG_INFO(log, "Shutting down.");
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
main_config_reloader.reset();
global_context->shutdown();
@ -450,6 +472,7 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
buildLoggers(config(), logger());
main_config_reloader->start();
LOG_INFO(log, "Ready for connections.");

View File

@ -863,6 +863,9 @@ if (ThreadFuzzer::instance().isEffective())
if (config->has("max_concurrent_queries"))
global_context->getProcessList().setMaxSize(config->getInt("max_concurrent_queries", 0));
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
if (!initial_loading)
{
/// We do not load ZooKeeper configuration on the first config loading

View File

@ -631,6 +631,14 @@ LogEntryPtr Changelog::entryAt(uint64_t index)
return src;
}
LogEntryPtr Changelog::getLatestConfigChange() const
{
for (const auto & [_, entry] : logs)
if (entry->get_val_type() == nuraft::conf)
return entry;
return nullptr;
}
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;

View File

@ -102,6 +102,9 @@ public:
/// Last entry in log, or fake entry with term 0 if log is empty
LogEntryPtr getLastEntry() const;
/// Get entry with latest config in logstore
LogEntryPtr getLatestConfigChange() const;
/// Return log entries between [start, end)
LogEntriesPtr getLogEntriesBetween(uint64_t start_index, uint64_t end_index);

View File

@ -38,7 +38,8 @@ struct Settings;
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0)
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -280,7 +280,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
{
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
}
}
catch (...)
{
@ -290,6 +289,8 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
updateConfiguration(config);
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -325,6 +326,10 @@ void KeeperDispatcher::shutdown()
snapshots_queue.finish();
if (snapshot_thread.joinable())
snapshot_thread.join();
update_configuration_queue.finish();
if (update_configuration_thread.joinable())
update_configuration_thread.join();
}
if (server)
@ -505,4 +510,71 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
return future.get();
}
void KeeperDispatcher::updateConfigurationThread()
{
while (true)
{
if (shutdown_called)
return;
try
{
if (!server->checkInit())
{
LOG_INFO(log, "Server still not initialized, will not apply configuration until initialization finished");
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
continue;
}
ConfigUpdateAction action;
if (!update_configuration_queue.pop(action))
break;
/// We must wait this update from leader or apply it ourself (if we are leader)
bool done = false;
while (!done)
{
if (shutdown_called)
return;
if (isLeader())
{
server->applyConfigurationUpdate(action);
done = true;
}
else
{
done = server->waitConfigurationUpdate(action);
if (!done)
LOG_INFO(log, "Cannot wait for configuration update, maybe we become leader, or maybe update is invalid, will try to wait one more time");
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto diff = server->getConfigurationDiff(config);
if (diff.empty())
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
else if (diff.size() > 1)
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
else
LOG_DEBUG(log, "Configuration change size ({})", diff.size());
for (auto & change : diff)
{
bool push_result = update_configuration_queue.push(change);
if (!push_result)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
}
}
}

View File

@ -33,12 +33,16 @@ private:
CoordinationSettingsPtr coordination_settings;
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
/// More than 1k updates is definitely misconfiguration.
UpdateConfigurationQueue update_configuration_queue{1000};
std::atomic<bool> shutdown_called{false};
std::mutex session_to_response_callback_mutex;
@ -62,6 +66,8 @@ private:
ThreadFromGlobalPool session_cleaner_thread;
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
/// Apply or wait for configuration changes
ThreadFromGlobalPool update_configuration_thread;
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
@ -80,6 +86,8 @@ private:
void sessionCleanerTask();
/// Thread create snapshots in the background
void snapshotThread();
/// Thread apply or wait configuration changes from leader
void updateConfigurationThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
@ -107,6 +115,10 @@ public:
return server && server->checkInit();
}
/// Registered in ConfigReloader callback. Add new configuration changes to
/// update_configuration_queue. Keeper Dispatcher apply them asynchronously.
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();

View File

@ -112,4 +112,10 @@ void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*co
changelog.flush();
}
nuraft::ptr<nuraft::log_entry> KeeperLogStore::getLatestConfigChange() const
{
std::lock_guard lock(changelog_lock);
return changelog.getLatestConfigChange();
}
}

View File

@ -58,6 +58,9 @@ public:
/// Flush batch of appended entries
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
/// Get entry with latest config in logstore
nuraft::ptr<nuraft::log_entry> getLatestConfigChange() const;
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;

View File

@ -121,23 +121,42 @@ void KeeperServer::startup()
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
bool single_server = state_manager->getTotalServers() == 1;
auto latest_snapshot_config = state_machine->getClusterConfig();
auto latest_log_store_config = state_manager->getLatestConfigFromLogStore();
nuraft::raft_params params;
if (single_server)
if (latest_snapshot_config && latest_log_store_config)
{
/// Don't make sense in single server mode
params.heart_beat_interval_ = 0;
params.election_timeout_lower_bound_ = 0;
params.election_timeout_upper_bound_ = 0;
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
{
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
state_manager->save_config(*latest_snapshot_config);
}
else
{
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
state_manager->save_config(*latest_log_store_config);
}
}
else if (latest_snapshot_config)
{
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
state_manager->save_config(*latest_snapshot_config);
}
else if (latest_log_store_config)
{
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
state_manager->save_config(*latest_log_store_config);
}
else
{
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
}
nuraft::raft_params params;
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
params.stale_log_gap_ = coordination_settings->stale_log_gap;
@ -364,4 +383,139 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
return state_manager->getConfigurationDiff(config);
}
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
{
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
added = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
break;
}
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
LOG_INFO(log, "Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
if (!added)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to add server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
bool removed = false;
if (task.server->get_id() == state_manager->server_id())
{
LOG_INFO(log, "Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
raft_instance->yield_leadership();
return;
}
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
removed = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
break;
}
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
LOG_INFO(log, "Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
if (!removed)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
}
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
{
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added by leader", task.server->get_id());
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to add server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed by leader", task.server->get_id());
return true;
}
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to remove server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
return true;
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
return true;
}
}

View File

@ -89,6 +89,18 @@ public:
void shutdown();
int getServerID() const { return server_id; }
/// Get configuration diff between current configuration in RAFT and in XML file
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
/// Apply action for configuration update. Actually call raft_instance->remove_srv or raft_instance->add_srv.
/// Synchronously check for update results with retries.
void applyConfigurationUpdate(const ConfigUpdateAction & task);
/// Wait configuration update for action. Used by followers.
/// Return true if update was successfully received.
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
};
}

View File

@ -205,9 +205,17 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
writeBinary(id, out);
}
}
/// Serialize cluster config
if (snapshot.cluster_config)
{
auto buffer = snapshot.cluster_config->serialize();
writeVarUInt(buffer->size(), out);
out.write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size());
}
}
SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, ReadBuffer & in)
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in)
{
uint8_t version;
readBinary(version, in);
@ -215,11 +223,13 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
if (current_version > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);
deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in);
KeeperStorage & storage = *deserialization_result.storage;
int64_t session_id;
readBinary(session_id, in);
storage.zxid = result->get_last_log_idx();
storage.zxid = deserialization_result.snapshot_meta->get_last_log_idx();
storage.session_id_counter = session_id;
/// Before V1 we serialized ACL without acl_map
@ -309,13 +319,24 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
current_session_size++;
}
return result;
/// Optional cluster config
ClusterConfigPtr cluster_config = nullptr;
if (!in.eof())
{
size_t data_size;
readVarUInt(data_size, in);
auto buffer = nuraft::buffer::alloc(data_size);
in.readStrict(reinterpret_cast<char *>(buffer->data_begin()), data_size);
buffer->pos(0);
deserialization_result.cluster_config = ClusterConfig::deserialize(*buffer);
}
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_)
: storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
, cluster_config(cluster_config_)
{
storage->enableSnapshotMode();
snapshot_container_size = storage->container.snapshotSize();
@ -325,10 +346,11 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t
session_and_auth = storage->session_and_auth;
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_)
: storage(storage_)
, snapshot_meta(snapshot_meta_)
, session_id(storage->session_id_counter)
, cluster_config(cluster_config_)
{
storage->enableSnapshotMode();
snapshot_container_size = storage->container.snapshotSize();
@ -461,7 +483,7 @@ bool KeeperSnapshotManager::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer)
return magic_from_buffer == ZSTD_COMPRESSED_MAGIC;
}
SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
{
bool is_zstd_compressed = isZstdCompressed(buffer);
@ -473,12 +495,13 @@ SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nura
else
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);
auto storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, *compressed_reader);
return std::make_pair(snapshot_metadata, std::move(storage));
SnapshotDeserializationResult result;
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
KeeperStorageSnapshot::deserialize(result, *compressed_reader);
return result;
}
SnapshotMetaAndStorage KeeperSnapshotManager::restoreFromLatestSnapshot()
SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot()
{
if (existing_snapshots.empty())
return {};
@ -502,7 +525,6 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
std::filesystem::remove(itr->second);
existing_snapshots.erase(itr);
}

View File

@ -9,6 +9,8 @@ namespace DB
using SnapshotMetadata = nuraft::snapshot;
using SnapshotMetadataPtr = std::shared_ptr<SnapshotMetadata>;
using ClusterConfig = nuraft::cluster_config;
using ClusterConfigPtr = nuraft::ptr<ClusterConfig>;
enum SnapshotVersion : uint8_t
{
@ -20,6 +22,17 @@ enum SnapshotVersion : uint8_t
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V3;
/// What is stored in binary shapsnot
struct SnapshotDeserializationResult
{
/// Storage
KeeperStoragePtr storage;
/// Snapshot metadata (up_to_log_idx and so on)
SnapshotMetadataPtr snapshot_meta;
/// Cluster config
ClusterConfigPtr cluster_config;
};
/// In memory keeper snapshot. Keeper Storage based on a hash map which can be
/// turned into snapshot mode. This operation is fast and KeeperStorageSnapshot
/// class do it in constructor. It also copies iterators from storage hash table
@ -31,14 +44,15 @@ static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V3;
struct KeeperStorageSnapshot
{
public:
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_ = nullptr);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_ = nullptr);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~KeeperStorageSnapshot();
static void serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out);
static SnapshotMetadataPtr deserialize(KeeperStorage & storage, ReadBuffer & in);
static void deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in);
KeeperStorage * storage;
@ -58,6 +72,8 @@ public:
KeeperStorage::SessionAndAuth session_and_auth;
/// ACLs cache for better performance. Without we cannot deserialize storage.
std::unordered_map<uint64_t, Coordination::ACLs> acl_map;
/// Cluster config from snapshot, can be empty
ClusterConfigPtr cluster_config;
};
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
@ -76,7 +92,7 @@ public:
bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500);
/// Restore storage from latest available snapshot
SnapshotMetaAndStorage restoreFromLatestSnapshot();
SnapshotDeserializationResult restoreFromLatestSnapshot();
/// Compress snapshot and serialize it to buffer
nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const;
@ -84,7 +100,7 @@ public:
/// Serialize already compressed snapshot to disk (return path)
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
/// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer.
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const;

View File

@ -74,7 +74,10 @@ void KeeperStateMachine::init()
try
{
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
loaded = true;
break;
@ -152,13 +155,24 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
}
last_committed_idx = s.get_last_log_idx();
return true;
}
void KeeperStateMachine::commit_config(const uint64_t /*log_idx*/, nuraft::ptr<nuraft::cluster_config> & new_conf)
{
std::lock_guard lock(cluster_config_lock);
auto tmp = new_conf->serialize();
cluster_config = ClusterConfig::deserialize(*tmp);
}
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()
{
/// Just return the latest snapshot.
@ -177,7 +191,7 @@ void KeeperStateMachine::create_snapshot(
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
/// create snapshot task for background execution (in snapshot thread)
@ -239,7 +253,7 @@ void KeeperStateMachine::save_logical_snp_obj(
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx(), getClusterConfig());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
@ -324,4 +338,16 @@ void KeeperStateMachine::shutdownStorage()
storage->finalize();
}
ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
{
std::lock_guard lock(cluster_config_lock);
if (cluster_config)
{
/// dumb way to return copy...
auto tmp = cluster_config->serialize();
return ClusterConfig::deserialize(*tmp);
}
return nullptr;
}
}

View File

@ -32,6 +32,9 @@ public:
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override;
/// Currently not supported
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
@ -76,6 +79,8 @@ public:
void shutdownStorage();
ClusterConfigPtr getClusterConfig() const;
private:
/// In our state machine we always have a single snapshot which is stored
@ -109,8 +114,15 @@ private:
/// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
/// Cluster config for our quorum.
/// It's a copy of config stored in StateManager, but here
/// we also write it to disk during snapshot. Must be used with lock.
mutable std::mutex cluster_config_lock;
ClusterConfigPtr cluster_config;
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
};

View File

@ -31,42 +31,22 @@ namespace
}
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, my_port(port)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
cluster_config->get_servers().push_back(peer_config);
}
KeeperStateManager::KeeperStateManager(
int my_server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper)
: my_server_id(my_server_id_)
, secure(config.getBool(config_prefix + ".raft_configuration.secure", false))
, log_store(nuraft::cs_new<KeeperLogStore>(
getLogsPathFromConfig(config_prefix, config, standalone_keeper),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync, coordination_settings->compress_logs))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
KeeperConfigurationWrapper KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const
{
KeeperConfigurationWrapper result;
result.cluster_config = std::make_shared<nuraft::cluster_config>();
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix + ".raft_configuration", keys);
total_servers = keys.size();
size_t total_servers = 0;
for (const auto & server_key : keys)
{
if (!startsWith(server_key, "server"))
continue;
std::string full_prefix = config_prefix + ".raft_configuration." + server_key;
int server_id = config.getInt(full_prefix + ".id");
int new_server_id = config.getInt(full_prefix + ".id");
std::string hostname = config.getString(full_prefix + ".hostname");
int port = config.getInt(full_prefix + ".port");
bool can_become_leader = config.getBool(full_prefix + ".can_become_leader", true);
@ -74,24 +54,55 @@ KeeperStateManager::KeeperStateManager(
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
if (start_as_follower)
start_as_follower_servers.insert(server_id);
result.servers_start_as_followers.insert(new_server_id);
auto endpoint = hostname + ":" + std::to_string(port);
auto peer_config = nuraft::cs_new<nuraft::srv_config>(server_id, 0, endpoint, "", !can_become_leader, priority);
if (server_id == my_server_id)
auto peer_config = nuraft::cs_new<nuraft::srv_config>(new_server_id, 0, endpoint, "", !can_become_leader, priority);
if (my_server_id == new_server_id)
{
my_server_config = peer_config;
my_port = port;
result.config = peer_config;
result.port = port;
}
cluster_config->get_servers().push_back(peer_config);
result.cluster_config->get_servers().push_back(peer_config);
total_servers++;
}
if (!my_server_config)
if (!result.config && !allow_without_us)
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
if (start_as_follower_servers.size() == cluster_config->get_servers().size())
if (result.servers_start_as_followers.size() == total_servers)
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
return result;
}
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
configuration_wrapper.port = port;
configuration_wrapper.config = peer_config;
configuration_wrapper.cluster_config->get_servers().push_back(peer_config);
}
KeeperStateManager::KeeperStateManager(
int server_id_,
const std::string & config_prefix_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper)
: my_server_id(server_id_)
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
, config_prefix(config_prefix_)
, configuration_wrapper(parseServersConfiguration(config, false))
, log_store(nuraft::cs_new<KeeperLogStore>(
getLogsPathFromConfig(config_prefix_, config, standalone_keeper),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync, coordination_settings->compress_logs))
{
}
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
@ -99,6 +110,14 @@ void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t log
log_store->init(last_commited_index, logs_to_keep);
}
ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
{
auto entry_with_change = log_store->getLatestConfigChange();
if (entry_with_change)
return ClusterConfig::deserialize(entry_with_change->get_buf());
return nullptr;
}
void KeeperStateManager::flushLogStore()
{
log_store->flush();
@ -106,18 +125,67 @@ void KeeperStateManager::flushLogStore()
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
std::lock_guard lock(configuration_wrapper_mutex);
nuraft::ptr<nuraft::buffer> buf = config.serialize();
cluster_config = nuraft::cluster_config::deserialize(*buf);
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);
}
void KeeperStateManager::save_state(const nuraft::srv_state & state)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
nuraft::ptr<nuraft::buffer> buf = state.serialize();
server_state = nuraft::srv_state::deserialize(*buf);
}
nuraft::ptr<nuraft::buffer> buf = state.serialize();
server_state = nuraft::srv_state::deserialize(*buf);
}
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
{
auto new_configuration_wrapper = parseServersConfiguration(config, true);
std::unordered_map<int, KeeperServerConfigPtr> new_ids, old_ids;
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())
new_ids[new_server->get_id()] = new_server;
{
std::lock_guard lock(configuration_wrapper_mutex);
for (const auto & old_server : configuration_wrapper.cluster_config->get_servers())
old_ids[old_server->get_id()] = old_server;
}
ConfigUpdateActions result;
/// First of all add new servers
for (auto [new_id, server_config] : new_ids)
{
if (!old_ids.count(new_id))
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::AddServer, server_config});
}
/// After that remove old ones
for (auto [old_id, server_config] : old_ids)
{
if (!new_ids.count(old_id))
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
}
{
std::lock_guard lock(configuration_wrapper_mutex);
/// And update priority if required
for (const auto & old_server : configuration_wrapper.cluster_config->get_servers())
{
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())
{
if (old_server->get_id() == new_server->get_id())
{
if (old_server->get_priority() != new_server->get_priority())
{
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::UpdatePriority, new_server});
}
break;
}
}
}
}
return result;
}
}

View File

@ -6,20 +6,57 @@
#include <Coordination/CoordinationSettings.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperSnapshotManager.h>
namespace DB
{
using KeeperServerConfigPtr = nuraft::ptr<nuraft::srv_config>;
/// Wrapper struct for Keeper cluster config. We parse this
/// info from XML files.
struct KeeperConfigurationWrapper
{
/// Our port
int port;
/// Our config
KeeperServerConfigPtr config;
/// Servers id's to start as followers
std::unordered_set<int> servers_start_as_followers;
/// Cluster config
ClusterConfigPtr cluster_config;
};
/// When our configuration changes the following action types
/// can happen
enum class ConfigUpdateActionType
{
RemoveServer,
AddServer,
UpdatePriority,
};
/// Action to update configuration
struct ConfigUpdateAction
{
ConfigUpdateActionType action_type;
KeeperServerConfigPtr server;
};
using ConfigUpdateActions = std::vector<ConfigUpdateAction>;
/// Responsible for managing our and cluster configuration
class KeeperStateManager : public nuraft::state_mgr
{
public:
KeeperStateManager(
int server_id_,
const std::string & config_prefix,
const std::string & config_prefix_,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings,
bool standalone_keeper);
/// Constructor for tests
KeeperStateManager(
int server_id_,
const std::string & host,
@ -30,8 +67,14 @@ public:
void flushLogStore();
nuraft::ptr<nuraft::cluster_config> load_config() override { return cluster_config; }
/// Called on server start, in our case we don't use any separate logic for load
nuraft::ptr<nuraft::cluster_config> load_config() override
{
std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.cluster_config;
}
/// Save cluster config (i.e. nodes, their priorities and so on)
void save_config(const nuraft::cluster_config & config) override;
void save_state(const nuraft::srv_state & state) override;
@ -40,17 +83,22 @@ public:
nuraft::ptr<nuraft::log_store> load_log_store() override { return log_store; }
Int32 server_id() override { return my_server_id; }
int32_t server_id() override { return my_server_id; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return my_server_config; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return configuration_wrapper.config; }
void system_exit(const int /* exit_code */) override {}
int getPort() const { return my_port; }
int getPort() const
{
std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.port;
}
bool shouldStartAsFollower() const
{
return start_as_follower_servers.count(my_server_id);
std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.servers_start_as_followers.count(my_server_id);
}
bool isSecure() const
@ -60,18 +108,31 @@ public:
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
uint64_t getTotalServers() const { return total_servers; }
uint64_t getTotalServers() const
{
std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.cluster_config->get_servers().size();
}
/// Read all log entries in log store from the begging and return latest config (with largest log_index)
ClusterConfigPtr getLatestConfigFromLogStore() const;
/// Get configuration diff between proposed XML and current state in RAFT
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
private:
int my_server_id;
int my_port;
bool secure;
uint64_t total_servers{0};
std::unordered_set<int> start_as_follower_servers;
std::string config_prefix;
mutable std::mutex configuration_wrapper_mutex;
KeeperConfigurationWrapper configuration_wrapper;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;
nuraft::ptr<nuraft::cluster_config> cluster_config;
nuraft::ptr<nuraft::srv_state> server_state;
/// Parse configuration from xml config.
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const;
};
}

View File

@ -962,7 +962,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
auto debuf = manager.deserializeSnapshotBufferFromDisk(2);
auto [snapshot_meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
auto [restored_storage, snapshot_meta, _] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1);
@ -1011,7 +1011,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites)
auto debuf = manager.deserializeSnapshotBufferFromDisk(50);
auto [meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
auto [restored_storage, meta, _] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 51);
for (size_t i = 0; i < 50; ++i)
@ -1050,7 +1050,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots)
EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin" + params.extension));
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
auto [restored_storage, meta, _] = manager.restoreFromLatestSnapshot();
EXPECT_EQ(restored_storage->container.size(), 251);
@ -1103,7 +1103,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i)));
}
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
auto [restored_storage, meta, _] = manager.restoreFromLatestSnapshot();
for (size_t i = 0; i < 50; ++i)
{
@ -1498,7 +1498,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
auto debuf = new_manager.deserializeSnapshotBufferFromDisk(2);
auto [snapshot_meta, restored_storage] = new_manager.deserializeSnapshotFromBuffer(debuf);
auto [restored_storage, snapshot_meta, _] = new_manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1);

View File

@ -161,8 +161,8 @@ struct ContextSharedPart
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
#if USE_NURAFT
mutable std::mutex keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_storage_dispatcher;
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1901,10 +1901,9 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
if (shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
@ -1923,8 +1922,8 @@ void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) cons
"will wait for Keeper synchronously");
}
shared->keeper_storage_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_storage_dispatcher->initialize(config, is_standalone_app, start_async);
shared->keeper_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_dispatcher->initialize(config, is_standalone_app, start_async);
}
#endif
}
@ -1932,27 +1931,39 @@ void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) cons
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & Context::getKeeperDispatcher() const
{
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (!shared->keeper_storage_dispatcher)
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests");
return shared->keeper_storage_dispatcher;
return shared->keeper_dispatcher;
}
#endif
void Context::shutdownKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (shared->keeper_dispatcher)
{
shared->keeper_storage_dispatcher->shutdown();
shared->keeper_storage_dispatcher.reset();
shared->keeper_dispatcher->shutdown();
shared->keeper_dispatcher.reset();
}
#endif
}
void Context::updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config)
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);
if (!shared->keeper_dispatcher)
return;
shared->keeper_dispatcher->updateConfiguration(config);
#endif
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);

View File

@ -678,6 +678,7 @@ public:
#endif
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);
@ -799,6 +800,7 @@ public:
DisksMap getDisksMap() const;
void updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config);
/// Provides storage politics schemes
StoragePolicyPtr getStoragePolicy(const String & name) const;

View File

@ -11,6 +11,11 @@
<!-- we want all logs for complex problems investigation -->
<reserved_log_items>1000000000000000</reserved_log_items>
<snapshot_distance>10000</snapshot_distance>
<!-- For instant start in single node configuration -->
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
</coordination_settings>
<raft_configuration>

View File

@ -11,6 +11,10 @@
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<snapshot_distance>75</snapshot_distance>
<!-- For instant start in single node configuration -->
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
</coordination_settings>
<raft_configuration>

View File

@ -10,6 +10,10 @@
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<force_sync>false</force_sync>
<!-- For instant start in single node configuration -->
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
</coordination_settings>
<raft_configuration>

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,22 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=[], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=[], stay_alive=True)
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def test_nodes_add(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_two_" + str(i), b"somedata")
p = Pool(3)
node2.stop_clickhouse()
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
waiter = p.apply_async(start, (node2,))
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn2 = get_fake_zk(node2)
for i in range(100):
assert zk_conn2.exists("/test_two_" + str(i)) is not None
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_three_" + str(i), b"somedata")
node3.stop_clickhouse()
node3.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_3.xml"), "/etc/clickhouse-server/config.d/enable_keeper3.xml")
waiter = p.apply_async(start, (node3,))
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn3 = get_fake_zk(node3)
for i in range(100):
assert zk_conn3.exists("/test_three_" + str(i)) is not None

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml'], stay_alive=True)
node4 = cluster.add_instance('node4', stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_node_move(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_four_" + str(i), b"somedata")
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_four_0")
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_four_0")
for i in range(100):
assert zk_conn2.exists("test_four_" + str(i)) is not None
assert zk_conn3.exists("test_four_" + str(i)) is not None
node4.stop_clickhouse()
node4.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_4.xml"), "/etc/clickhouse-server/config.d/enable_keeper4.xml")
p = Pool(3)
waiter = p.apply_async(start, (node4,))
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn4 = get_fake_zk(node4)
zk_conn4.sync("/test_four_0")
for i in range(100):
assert zk_conn4.exists("/test_four_" + str(i)) is not None
with pytest.raises(Exception):
# Adding and removing nodes is async operation
for i in range(10):
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_four_0")
time.sleep(i)

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import os
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml'], stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_nodes_remove(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_two_" + str(i), b"somedata")
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_two_0")
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_two_0")
for i in range(100):
assert zk_conn2.exists("test_two_" + str(i)) is not None
assert zk_conn3.exists("test_two_" + str(i)) is not None
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
zk_conn2 = get_fake_zk(node2)
for i in range(100):
assert zk_conn2.exists("test_two_" + str(i)) is not None
zk_conn2.create("/test_two_" + str(100 + i), b"otherdata")
zk_conn = get_fake_zk(node1)
zk_conn.sync("/test_two_0")
for i in range(100):
assert zk_conn.exists("test_two_" + str(i)) is not None
assert zk_conn.exists("test_two_" + str(100 + i)) is not None
with pytest.raises(Exception):
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_two_0")
node3.stop_clickhouse()
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_single_keeper1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
zk_conn = get_fake_zk(node1)
zk_conn.sync("/test_two_0")
for i in range(100):
assert zk_conn.exists("test_two_" + str(i)) is not None
assert zk_conn.exists("test_two_" + str(100 + i)) is not None
with pytest.raises(Exception):
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_two_0")
node2.stop_clickhouse()

View File

@ -11,6 +11,11 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<!-- For instant start in single node configuration -->
<heart_beat_interval_ms>0</heart_beat_interval_ms>
<election_timeout_lower_bound_ms>0</election_timeout_lower_bound_ms>
<election_timeout_upper_bound_ms>0</election_timeout_upper_bound_ms>
</coordination_settings>
<raft_configuration>