diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 56778b8dd69..aa947b22593 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -538,6 +538,9 @@ int Server::main(const std::vector & /*args*/) if (config->has("max_partition_size_to_drop")) global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop")); + if (config->has("zookeeper")) + global_context->reloadZooKeeperIfChanged(config); + global_context->updateStorageConfiguration(*config); }, /* already_loaded = */ true); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index e50aa8f1700..41d715b23f1 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -200,6 +200,18 @@ ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot); } +bool ZooKeeper::configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const +{ + ZooKeeperArgs args(config, config_name); + + // skip reload testkeeper cause it's for test and data in memory + if (args.implementation == implementation && implementation == "testkeeper") + return false; + + return std::tie(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot) + != std::tie(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot); +} + static Coordination::WatchCallback callbackForEvent(const EventPtr & watch) { diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index b2e49bee346..b1a69646db5 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -56,7 +56,7 @@ public: int32_t session_timeout_ms_ = DEFAULT_SESSION_TIMEOUT, int32_t operation_timeout_ms_ = DEFAULT_OPERATION_TIMEOUT, const std::string & chroot_ = "", - const std::string & implementation = "zookeeper"); + const std::string & implementation_ = "zookeeper"); /** Config of the form: @@ -87,6 +87,8 @@ public: /// This object remains unchanged, and the new session is returned. Ptr startNewSession() const; + bool configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const; + /// Returns true, if the session has expired. bool expired(); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 9013dd389d9..ab511acda76 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1498,6 +1498,15 @@ void Context::resetZooKeeper() const shared->zookeeper.reset(); } +void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const +{ + std::lock_guard lock(shared->zookeeper_mutex); + if (!shared->zookeeper || shared->zookeeper->configChanged(*config, "zookeeper")) + { + shared->zookeeper = std::make_shared(*config, "zookeeper"); + } +} + bool Context::hasZooKeeper() const { return getConfigRef().has("zookeeper"); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ff2af111885..7200bf57e6e 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -476,6 +476,8 @@ public: bool hasZooKeeper() const; /// Reset current zookeeper session. Do not create a new one. void resetZooKeeper() const; + // Reload Zookeeper + void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const; /// Create a cache of uncompressed blocks of specified size. This can be done only once. void setUncompressedCache(size_t max_size_in_bytes); diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index f5d9290a17e..6d0f038daed 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -763,6 +763,16 @@ class ClickHouseCluster: def add_zookeeper_startup_command(self, command): self.pre_zookeeper_commands.append(command) + def stop_zookeeper_nodes(self, zk_nodes): + for n in zk_nodes: + logging.info("Stopping zookeeper node: %s", n) + subprocess_check_call(self.base_zookeeper_cmd + ["stop", n]) + + def start_zookeeper_nodes(self, zk_nodes): + for n in zk_nodes: + logging.info("Starting zookeeper node: %s", n) + subprocess_check_call(self.base_zookeeper_cmd + ["start", n]) + CLICKHOUSE_START_COMMAND = "clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log" diff --git a/tests/integration/test_reload_zookeeper/__init__.py b/tests/integration/test_reload_zookeeper/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_reload_zookeeper/configs/config.xml b/tests/integration/test_reload_zookeeper/configs/config.xml new file mode 100644 index 00000000000..b5e5495c096 --- /dev/null +++ b/tests/integration/test_reload_zookeeper/configs/config.xml @@ -0,0 +1,31 @@ + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + + + 9000 + 127.0.0.1 + + + + true + none + + AcceptCertificateHandler + + + + + 500 + 5368709120 + ./clickhouse/ + users.xml + + 1 + 1 + diff --git a/tests/integration/test_reload_zookeeper/configs/users.xml b/tests/integration/test_reload_zookeeper/configs/users.xml new file mode 100644 index 00000000000..6061af8e33d --- /dev/null +++ b/tests/integration/test_reload_zookeeper/configs/users.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + ::/0 + + default + default + + + + + + + + diff --git a/tests/integration/test_reload_zookeeper/configs/zookeeper.xml b/tests/integration/test_reload_zookeeper/configs/zookeeper.xml new file mode 100644 index 00000000000..ecadd4c74c3 --- /dev/null +++ b/tests/integration/test_reload_zookeeper/configs/zookeeper.xml @@ -0,0 +1,19 @@ + + + + + zoo1 + 2181 + + + zoo2 + 2181 + + + zoo3 + 2181 + + 2000 + + + \ No newline at end of file diff --git a/tests/integration/test_reload_zookeeper/test.py b/tests/integration/test_reload_zookeeper/test.py new file mode 100644 index 00000000000..66df5a1a126 --- /dev/null +++ b/tests/integration/test_reload_zookeeper/test.py @@ -0,0 +1,120 @@ +import time +import pytest +import os + +from helpers.cluster import ClickHouseCluster +from helpers.client import QueryRuntimeException +from helpers.test_tools import assert_eq_with_retry + + +cluster = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper.xml') +node = cluster.add_instance('node', with_zookeeper=True) + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +ZK_CONFIG_PATH = os.path.join(SCRIPT_DIR, 'configs/zookeeper.xml') + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + node.query( + ''' + CREATE TABLE test_table(date Date, id UInt32) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard1/test/test_table', '1') + PARTITION BY toYYYYMM(date) + ORDER BY id + ''') + + yield cluster + finally: + ## write back the configs + config = open(ZK_CONFIG_PATH, 'w') + config.write( +""" + + + + zoo1 + 2181 + + + zoo2 + 2181 + + + zoo3 + 2181 + + 2000 + + + """) + config.close() + cluster.shutdown() + +def test_reload_zookeeper(start_cluster): + + def wait_zookeeper_node_to_start(zk_nodes, timeout=60): + start = time.time() + while time.time() - start < timeout: + try: + for instance in zk_nodes: + conn = start_cluster.get_kazoo_client(instance) + conn.get_children('/') + print("All instances of ZooKeeper started") + return + except Exception as ex: + print("Can't connect to ZooKeeper " + str(ex)) + time.sleep(0.5) + + node.query("INSERT INTO test_table(date, id) select today(), number FROM numbers(1000)") + + ## remove zoo2, zoo3 from configs + config = open(ZK_CONFIG_PATH, 'w') + config.write( +""" + + + + zoo1 + 2181 + + 2000 + + +""" + ) + config.close() + ## config reloads, but can still work + assert_eq_with_retry(node, "SELECT COUNT() FROM test_table", '1000', retry_count=120, sleep_time=0.5) + + ## stop all zookeepers, table will be readonly + cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"]) + with pytest.raises(QueryRuntimeException): + node.query("SELECT COUNT() FROM test_table") + + ## start zoo2, zoo3, table will be readonly too, because it only connect to zoo1 + cluster.start_zookeeper_nodes(["zoo2", "zoo3"]) + wait_zookeeper_node_to_start(["zoo2", "zoo3"]) + with pytest.raises(QueryRuntimeException): + node.query("SELECT COUNT() FROM test_table") + + ## set config to zoo2, server will be normal + config = open(ZK_CONFIG_PATH, 'w') + config.write( +""" + + + + zoo2 + 2181 + + 2000 + + +""" + ) + config.close() + assert_eq_with_retry(node, "SELECT COUNT() FROM test_table", '1000', retry_count=120, sleep_time=0.5) +