This commit is contained in:
alesapin 2021-10-19 15:00:26 +03:00
parent f2d266acce
commit 2d4b601d38
31 changed files with 1011 additions and 93 deletions

View File

@ -38,7 +38,8 @@ struct Settings;
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0)
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -275,7 +275,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
updateConfiguration(config);
}
else
{
@ -290,6 +289,8 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
updateConfiguration(config);
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -325,6 +326,10 @@ void KeeperDispatcher::shutdown()
snapshots_queue.finish();
if (snapshot_thread.joinable())
snapshot_thread.join();
update_configuration_queue.finish();
if (update_configuration_thread.joinable())
update_configuration_thread.join();
}
if (server)
@ -505,17 +510,69 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
return future.get();
}
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
void KeeperDispatcher::updateConfigurationThread()
{
while (true)
{
if (shutdown_called)
return;
try
{
if (!server->checkInit())
{
LOG_INFO(log, "Server still not initialized, will not apply configuration until initialization finished");
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
continue;
}
ConfigUpdateAction action;
if (!update_configuration_queue.pop(action))
break;
/// Only leader node must check dead sessions
if (isLeader())
{
server->updateConfiguration(config);
server->applyConfigurationUpdate(action);
}
else
{
LOG_INFO(log, "Configuration changed, but we are not leader, so we will wait update from leader");
}
String message;
if (action.type == ConfigUpdateActionType::RemoveServer)
message += "remove";
else if (action.type == ConfigUpdateActionType::AddServer)
message += "add";
else if (action.type == ConfigUpdateActionType::UpdatePriority)
message += "update priority for";
else
message += "unknown action for";
LOG_INFO(log, "Configuration changed ({} server {}), but we are not leader, so we will wait update from leader", message, task.server->get_id());
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto diff = server->getConfigurationDiff(config);
if (diff.empty())
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
else if (diff.size() > 1)
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
else
LOG_DEBUG(log, "Configuration change size ({})", diff.size());
for (auto & change : diff)
{
bool push_result = update_configuration_queue.push(change);
if (!push_result)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
}
}
}

View File

@ -33,11 +33,13 @@ private:
CoordinationSettingsPtr coordination_settings;
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
UpdateConfigurationQueue update_configuration_queue{1000};
std::atomic<bool> shutdown_called{false};
@ -62,6 +64,9 @@ private:
ThreadFromGlobalPool session_cleaner_thread;
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
ThreadFromGlobalPool update_configuration_thread;
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
@ -80,6 +85,8 @@ private:
/// Thread create snapshots in the background
void snapshotThread();
void updateConfigurationThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
/// Add error responses for requests to responses queue.

View File

@ -127,32 +127,35 @@ void KeeperServer::startup()
if (latest_snapshot_config && latest_log_store_config)
{
if (latest_snapshot_config->get_log_idx() > latest_log_store_config->get_log_idx())
{
LOG_INFO(log, "Will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
state_manager->setClusterConfig(latest_snapshot_config);
}
else
{
LOG_INFO(log, "Will use config from log store with log index {}", latest_snapshot_config->get_log_idx());
state_manager->setClusterConfig(latest_log_store_config);
}
}
else if (latest_snapshot_config)
state_manager->setClusterConfig(latest_snapshot_config);
else if (latest_log_store_config)
state_manager->setClusterConfig(latest_log_store_config);
/// else use parsed config in state_manager constructor (first start)
bool single_server = state_manager->getTotalServers() == 1;
nuraft::raft_params params;
if (single_server)
{
/// Don't make sense in single server mode
params.heart_beat_interval_ = 0;
params.election_timeout_lower_bound_ = 0;
params.election_timeout_upper_bound_ = 0;
LOG_INFO(log, "No config in log store, will use config from snapshot with log index {}", latest_snapshot_config->get_log_idx());
state_manager->setClusterConfig(latest_snapshot_config);
}
else if (latest_log_store_config)
{
LOG_INFO(log, "No config in snapshot, will use config from log store with log index {}", latest_log_store_config->get_log_idx());
state_manager->setClusterConfig(latest_log_store_config);
}
else
{
LOG_INFO(log, "No config in log store and snapshot, probably it's initial run. Will use config from .xml on disk");
}
nuraft::raft_params params;
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
}
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
@ -380,36 +383,79 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
void KeeperServer::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
auto diff = state_manager->getConfigurationDiff(config);
if (diff.empty())
{
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
return;
}
else if (diff.size() > 1)
{
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
}
else
{
LOG_WARNING(log, "Configuration change size ({})", diff.size());
return state_manager->getConfigurationDiff(config);
}
for (auto & task : diff)
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
{
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
{
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
added = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
break;
}
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to add server (id {}) was not accepted by RAFT", task.server->get_id());
LOG_INFO(log, "Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
if (!added)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to add server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
{
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
bool removed = false;
if (task.server->get_id() == stage_manager->server_id())
{
LOG_INFO(log, "Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
raft_instance->yield_leadership();
return;
}
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
removed = true;
break;
}
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
break;
}
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to remove server (id {}) was not accepted by RAFT", task.server->get_id());
LOG_INFO(log, "Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry", task.server->get_id(), i + 1, sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
}
if (!removed)
throw Exception(ErrorCodes::RAFT_ERROR, "Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries", task.server->get_id(), coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
@ -418,5 +464,3 @@ void KeeperServer::updateConfiguration(const Poco::Util::AbstractConfiguration &
}
}
}

View File

@ -90,7 +90,9 @@ public:
int getServerID() const { return server_id; }
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
void applyConfigurationUpdate(const ConfigUpdateAction & action);
};
}

View File

@ -81,13 +81,12 @@ KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host,
: my_server_id(server_id_)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
, log(&Poco::Logger::get("KeeperStateManager"))
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
servers_configuration.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
servers_configuration.port = port;
servers_configuration.config = peer_config;
servers_configuration.cluster_config->get_servers().push_back(peer_config);
configuration_wrapper.cluster_config = nuraft::cs_new<nuraft::cluster_config>();
configuration_wrapper.port = port;
configuration_wrapper.config = peer_config;
configuration_wrapper.cluster_config->get_servers().push_back(peer_config);
}
KeeperStateManager::KeeperStateManager(
@ -99,11 +98,10 @@ KeeperStateManager::KeeperStateManager(
: my_server_id(server_id_)
, secure(config.getBool(config_prefix_ + ".raft_configuration.secure", false))
, config_prefix(config_prefix_)
, servers_configuration(parseServersConfiguration(config))
, configuration_wrapper(parseServersConfiguration(config))
, log_store(nuraft::cs_new<KeeperLogStore>(
getLogsPathFromConfig(config_prefix_, config, standalone_keeper),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync, coordination_settings->compress_logs))
, log(&Poco::Logger::get("KeeperStateManager"))
{
}
@ -122,7 +120,7 @@ ClusterConfigPtr KeeperStateManager::getLatestConfigFromLogStore() const
void KeeperStateManager::setClusterConfig(const ClusterConfigPtr & cluster_config)
{
servers_configuration.cluster_config = cluster_config;
configuration_wrapper.cluster_config = cluster_config;
}
void KeeperStateManager::flushLogStore()
@ -133,7 +131,7 @@ void KeeperStateManager::flushLogStore()
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
{
nuraft::ptr<nuraft::buffer> buf = config.serialize();
servers_configuration.cluster_config = nuraft::cluster_config::deserialize(*buf);
configuration_wrapper.cluster_config = nuraft::cluster_config::deserialize(*buf);
}
void KeeperStateManager::save_state(const nuraft::srv_state & state)
@ -144,51 +142,42 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state)
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
{
auto new_servers_configuration = parseServersConfiguration(config);
if (new_servers_configuration.port != servers_configuration.port)
auto new_configuration_wrapper = parseServersConfiguration(config);
if (new_configuration_wrapper.port != configuration_wrapper.port)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot change port of already running RAFT server");
std::unordered_map<int, KeeperServerConfigPtr> new_ids, old_ids;
for (auto new_server : new_servers_configuration.cluster_config->get_servers())
{
LOG_INFO(log, "NEW SERVER {}", new_server->get_id());
for (auto new_server : new_configuration_wrapper.cluster_config->get_servers())
new_ids[new_server->get_id()] = new_server;
}
for (auto old_server : servers_configuration.cluster_config->get_servers())
{
LOG_INFO(log, "OLD SERVER {}", old_server->get_id());
for (auto old_server : configuration_wrapper.cluster_config->get_servers())
old_ids[old_server->get_id()] = old_server;
}
ConfigUpdateActions result;
for (auto [old_id, server_config] : old_ids)
{
if (!new_ids.count(old_id))
{
LOG_INFO(log, "REMOVING SERVER {}", old_id);
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
}
}
for (auto [new_id, server_config] : new_ids)
{
if (!old_ids.count(new_id))
{
LOG_INFO(log, "ADDING SERVER {}", new_id);
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::AddServer, server_config});
}
for (auto [old_id, server_config] : old_ids)
{
if (!new_ids.count(old_id))
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
}
for (const auto & old_server : servers_configuration.cluster_config->get_servers())
/// And update priority if required
for (const auto & old_server : configuration_wrapper.cluster_config->get_servers())
{
for (const auto & new_server : new_servers_configuration.cluster_config->get_servers())
for (const auto & new_server : new_configuration_wrapper.cluster_config->get_servers())
{
if (old_server->get_id() == new_server->get_id())
{
LOG_INFO(log, "UPDATE PRIORITY {}", new_server->get_id());
if (old_server->get_priority() != new_server->get_priority())
{
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::UpdatePriority, new_server});
}
break;
}
}

View File

@ -13,7 +13,7 @@ namespace DB
using KeeperServerConfigPtr = nuraft::ptr<nuraft::srv_config>;
struct KeeperServersConfiguration
struct KeeperConfigurationWrapper
{
int port;
KeeperServerConfigPtr config;
@ -56,7 +56,7 @@ public:
void flushLogStore();
nuraft::ptr<nuraft::cluster_config> load_config() override { return servers_configuration.cluster_config; }
nuraft::ptr<nuraft::cluster_config> load_config() override { return configuration_wrapper.cluster_config; }
void save_config(const nuraft::cluster_config & config) override;
@ -68,15 +68,15 @@ public:
int32_t server_id() override { return my_server_id; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return servers_configuration.config; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return configuration_wrapper.config; }
void system_exit(const int /* exit_code */) override {}
int getPort() const { return servers_configuration.port; }
int getPort() const { return configuration_wrapper.port; }
bool shouldStartAsFollower() const
{
return servers_configuration.servers_start_as_followers.count(my_server_id);
return configuration_wrapper.servers_start_as_followers.count(my_server_id);
}
bool isSecure() const
@ -86,7 +86,7 @@ public:
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
uint64_t getTotalServers() const { return servers_configuration.cluster_config->get_servers().size(); }
uint64_t getTotalServers() const { return configuration_wrapper.cluster_config->get_servers().size(); }
ClusterConfigPtr getLatestConfigFromLogStore() const;
@ -98,13 +98,12 @@ private:
int my_server_id;
bool secure;
std::string config_prefix;
KeeperServersConfiguration servers_configuration;
KeeperConfigurationWrapper configuration_wrapper;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_state> server_state;
Poco::Logger * log;
KeeperServersConfiguration parseServersConfiguration(const Poco::Util::AbstractConfiguration & config) const;
KeeperConfigurationWrapper parseServersConfiguration(const Poco::Util::AbstractConfiguration & config) const;
};
}

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,22 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=[], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=[], stay_alive=True)
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def test_nodes_add(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_two_" + str(i), b"somedata")
p = Pool(3)
node2.stop_clickhouse()
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
waiter = p.apply_async(start, (node2,))
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn2 = get_fake_zk(node2)
for i in range(100):
assert zk_conn2.exists("/test_two_" + str(i)) is not None
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_three_" + str(i), b"somedata")
node3.stop_clickhouse()
node3.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_3.xml"), "/etc/clickhouse-server/config.d/enable_keeper3.xml")
waiter = p.apply_async(start, (node3,))
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_three_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn3 = get_fake_zk(node3)
for i in range(100):
assert zk_conn3.exists("/test_three_" + str(i)) is not None

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml'], stay_alive=True)
node4 = cluster.add_instance('node4', stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_node_move(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_four_" + str(i), b"somedata")
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_four_0")
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_four_0")
for i in range(100):
assert zk_conn2.exists("test_four_" + str(i)) is not None
assert zk_conn3.exists("test_four_" + str(i)) is not None
node4.stop_clickhouse()
node4.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_4.xml"), "/etc/clickhouse-server/config.d/enable_keeper4.xml")
p = Pool(3)
waiter = p.apply_async(start, (node4,))
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_node4_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
waiter.wait()
zk_conn4 = get_fake_zk(node4)
zk_conn4.sync("/test_four_0")
for i in range(100):
assert zk_conn4.exists("/test_four_" + str(i)) is not None
with pytest.raises(Exception):
# Adding and removing nodes is async operation
for i in range(10):
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_four_0")
time.sleep(i)

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,27 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'configs')
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml'], stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def get_fake_zk(node, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_nodes_remove(started_cluster):
zk_conn = get_fake_zk(node1)
for i in range(100):
zk_conn.create("/test_two_" + str(i), b"somedata")
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_two_0")
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_two_0")
for i in range(100):
assert zk_conn2.exists("test_two_" + str(i)) is not None
assert zk_conn3.exists("test_two_" + str(i)) is not None
node2.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_2.xml"), "/etc/clickhouse-server/config.d/enable_keeper2.xml")
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_keeper_two_nodes_1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
node2.query("SYSTEM RELOAD CONFIG")
zk_conn2 = get_fake_zk(node2)
for i in range(100):
assert zk_conn2.exists("test_two_" + str(i)) is not None
zk_conn2.create("/test_two_" + str(100 + i), b"otherdata")
zk_conn = get_fake_zk(node1)
zk_conn.sync("/test_two_0")
for i in range(100):
assert zk_conn.exists("test_two_" + str(i)) is not None
assert zk_conn.exists("test_two_" + str(100 + i)) is not None
with pytest.raises(Exception):
zk_conn3 = get_fake_zk(node3)
zk_conn3.sync("/test_two_0")
node3.stop_clickhouse()
node1.copy_file_to_container(os.path.join(CONFIG_DIR, "enable_single_keeper1.xml"), "/etc/clickhouse-server/config.d/enable_keeper1.xml")
node1.query("SYSTEM RELOAD CONFIG")
zk_conn = get_fake_zk(node1)
zk_conn.sync("/test_two_0")
for i in range(100):
assert zk_conn.exists("test_two_" + str(i)) is not None
assert zk_conn.exists("test_two_" + str(100 + i)) is not None
with pytest.raises(Exception):
zk_conn2 = get_fake_zk(node2)
zk_conn2.sync("/test_two_0")
node2.stop_clickhouse()