diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 28bbb95e01d..a3034150219 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -359,7 +359,7 @@ int Keeper::main(const std::vector & /*args*/) auto servers = std::make_shared>(); /// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config. - global_context->initializeKeeperDispatcher(); + global_context->initializeKeeperDispatcher(/* start_async = */false); for (const auto & listen_host : listen_hosts) { /// TCP Keeper diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 01033570926..c2c53150daf 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -986,8 +986,9 @@ if (ThreadFuzzer::instance().isEffective()) if (config().has("keeper_server")) { #if USE_NURAFT - /// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config. - global_context->initializeKeeperDispatcher(); + bool has_connection = has_zookeeper && global_context->tryCheckZooKeeperConnection(); + /// Initialize keeper RAFT. + global_context->initializeKeeperDispatcher(has_connection); for (const auto & listen_host : listen_hosts) { /// TCP Keeper diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 805dedab89c..00d443822e6 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -28,7 +28,7 @@ struct Settings; M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \ M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \ M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \ - M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \ + M(Milliseconds, startup_timeout, 180000, "How many time we will until RAFT to start", 0) \ M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \ M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \ diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index b4dc367ff62..8d8f98e175e 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -241,7 +241,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ return true; } -void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper) +void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async) { LOG_DEBUG(log, "Initializing storage dispatcher"); int myid = config.getInt("keeper_server.server_id"); @@ -262,8 +262,16 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf server->startup(); LOG_DEBUG(log, "Server initialized, waiting for quorum"); - server->waitInit(); - LOG_DEBUG(log, "Quorum initialized"); + if (!start_async) + { + server->waitInit(); + LOG_DEBUG(log, "Quorum initialized"); + } + else + { + LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready"); + } + } catch (...) { @@ -363,7 +371,7 @@ void KeeperDispatcher::sessionCleanerTask() try { /// Only leader node must check dead sessions - if (isLeader()) + if (server->checkInit() && isLeader()) { auto dead_sessions = server->getDeadSessions(); diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index f49063f8dea..8f19b081e26 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -100,7 +100,12 @@ public: /// Initialization from config. /// standalone_keeper -- we are standalone keeper application (not inside clickhouse server) - void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper); + void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async); + + bool checkInit() const + { + return server && server->checkInit(); + } /// Shutdown internal keeper parts (server, state machine, log storage, etc) void shutdown(); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index b27170c8ba1..56d28058991 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -353,6 +353,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ void KeeperServer::waitInit() { std::unique_lock lock(initialized_mutex); + int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds(); if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); })) throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization"); diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index a7e96156dc1..8e10d053471 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -80,6 +80,12 @@ public: /// Wait server initialization (see callbackFunc) void waitInit(); + /// Return true if KeeperServer initialized + bool checkInit() const + { + return initialized_flag; + } + void shutdown(); int getServerID() const { return server_id; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index e4c6de8853b..4ffd69af35b 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1749,6 +1749,20 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const return shared->zookeeper; } + +bool Context::tryCheckZooKeeperConnection() const +{ + try + { + getZooKeeper(); + return true; + } + catch (...) + { + return false; + } +} + UInt32 Context::getZooKeeperSessionUptime() const { std::lock_guard lock(shared->zookeeper_mutex); @@ -1776,19 +1790,24 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded() zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log); } -void Context::initializeKeeperDispatcher() const +void Context::initializeKeeperDispatcher(bool start_async) const { #if USE_NURAFT std::lock_guard lock(shared->keeper_storage_dispatcher_mutex); + if (shared->keeper_storage_dispatcher) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times"); const auto & config = getConfigRef(); if (config.has("keeper_server")) { + bool is_standalone_app = getApplicationType() == ApplicationType::KEEPER; + if (start_async && !is_standalone_app) + LOG_INFO(shared->log, "Connected to ZooKeeper (or Keeper) before internal Keeper start, will wait for Keeper asynchronously"); + shared->keeper_storage_dispatcher = std::make_shared(); - shared->keeper_storage_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER); + shared->keeper_storage_dispatcher->initialize(config, is_standalone_app, start_async); } #endif } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f6cded2b1d1..07c90049bef 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -666,12 +666,17 @@ public: /// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry. std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; + /// Try to connect to ZooKeeper using getZooKeeper. Useful for internal + /// Keeper start (check connection to some other node). Return true if + /// connected successfully (without exception). + bool tryCheckZooKeeperConnection() const; + UInt32 getZooKeeperSessionUptime() const; #if USE_NURAFT std::shared_ptr & getKeeperDispatcher() const; #endif - void initializeKeeperDispatcher() const; + void initializeKeeperDispatcher(bool start_async) const; void shutdownKeeperDispatcher() const; /// Set auxiliary zookeepers configuration at server starting or configuration reloading. diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 7ebbda9dfe6..88b7e68127e 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -286,7 +286,7 @@ void KeeperTCPHandler::runImpl() return; } - if (keeper_dispatcher->hasLeader()) + if (keeper_dispatcher->checkInit() && keeper_dispatcher->hasLeader()) { try { @@ -306,7 +306,8 @@ void KeeperTCPHandler::runImpl() } else { - LOG_WARNING(log, "Ignoring user request, because no alive leader exist"); + String reason = keeper_dispatcher->checkInit() ? "server is not initialized yet" : "no alive leader exists"; + LOG_WARNING(log, "Ignoring user request, because {}", reason); sendHandshake(false); return; } diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 51b7bfcbcb8..3854cadaba5 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -2320,6 +2320,9 @@ class ClickHouseInstance: def replace_config(self, path_to_config, replacement): self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)]) + def replace_in_config(self, path_to_config, replace, replacement): + self.exec_in_container(["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"]) + def create_dir(self, destroy_dir=True): """Create the instance directory and all the needed files there.""" diff --git a/tests/integration/test_keeper_three_nodes_start/__init__.py b/tests/integration/test_keeper_three_nodes_start/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_start/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper1.xml b/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper1.xml new file mode 100644 index 00000000000..bc62d817074 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper1.xml @@ -0,0 +1,32 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 44444 + + + 2 + node2 + 44444 + + + 3 + non_existing_node + 44444 + + + + diff --git a/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper2.xml b/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper2.xml new file mode 100644 index 00000000000..a6c476fb449 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_start/configs/enable_keeper2.xml @@ -0,0 +1,32 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 44444 + + + 2 + node2 + 44444 + + + 3 + non_existing_node + 44444 + + + + diff --git a/tests/integration/test_keeper_three_nodes_start/test.py b/tests/integration/test_keeper_three_nodes_start/test.py new file mode 100644 index 00000000000..7828f21d0d7 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_start/test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +#!/usr/bin/env python3 +import pytest +from helpers.cluster import ClickHouseCluster +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry +from kazoo.client import KazooClient, KazooState + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True) +node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True) + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + +def test_smoke(): + try: + cluster.start() + + node1_zk = get_fake_zk("node1") + node1_zk.create("/test_alive", b"aaaa") + + finally: + cluster.shutdown() diff --git a/tests/integration/test_keeper_three_nodes_two_alive/__init__.py b/tests/integration/test_keeper_three_nodes_two_alive/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper1.xml b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper1.xml new file mode 100644 index 00000000000..510424715c4 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper1.xml @@ -0,0 +1,32 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 44444 + + + 2 + node2 + 44444 + + + 3 + node3 + 44444 + + + + diff --git a/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper2.xml b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper2.xml new file mode 100644 index 00000000000..264601d8c98 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper2.xml @@ -0,0 +1,32 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 44444 + + + 2 + node2 + 44444 + + + 3 + node3 + 44444 + + + + diff --git a/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper3.xml b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper3.xml new file mode 100644 index 00000000000..7f9775939bb --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/configs/enable_keeper3.xml @@ -0,0 +1,32 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + + + + + 1 + node1 + 44444 + + + 2 + node2 + 44444 + + + 3 + node3 + 44444 + + + + diff --git a/tests/integration/test_keeper_three_nodes_two_alive/configs/keeper_conf.xml b/tests/integration/test_keeper_three_nodes_two_alive/configs/keeper_conf.xml new file mode 100644 index 00000000000..384e984f210 --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/configs/keeper_conf.xml @@ -0,0 +1,16 @@ + + + + node1 + 9181 + + + node2 + 9181 + + + node3 + 9181 + + + diff --git a/tests/integration/test_keeper_three_nodes_two_alive/test.py b/tests/integration/test_keeper_three_nodes_two_alive/test.py new file mode 100644 index 00000000000..2c13d3ef22b --- /dev/null +++ b/tests/integration/test_keeper_three_nodes_two_alive/test.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +import pytest +from helpers.cluster import ClickHouseCluster +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry +from kazoo.client import KazooClient, KazooState + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/keeper_conf.xml'], stay_alive=True) +node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/keeper_conf.xml'], stay_alive=True) +node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/keeper_conf.xml'], stay_alive=True) + + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + +def start(node): + node.start_clickhouse() + + +def test_start_offline(started_cluster): + p = Pool(3) + try: + node1_zk = get_fake_zk("node1") + node1_zk.create("/test_alive", b"aaaa") + + node1.stop_clickhouse() + node2.stop_clickhouse() + node3.stop_clickhouse() + + time.sleep(3) + p.map(start, [node2, node3]) + + node2_zk = get_fake_zk("node2") + node2_zk.create("/test_dead", b"data") + finally: + p.map(start, [node1, node2, node3]) + + +def test_start_non_existing(started_cluster): + p = Pool(3) + try: + node1.stop_clickhouse() + node2.stop_clickhouse() + node3.stop_clickhouse() + + node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'node3', 'non_existing_node') + node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'node3', 'non_existing_node') + + time.sleep(3) + p.map(start, [node2, node1]) + + node2_zk = get_fake_zk("node2") + node2_zk.create("/test_non_exising", b"data") + finally: + node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'non_existing_node', 'node3') + node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'non_existing_node', 'node3') + p.map(start, [node1, node2, node3]) + +def test_restart_third_node(started_cluster): + node1_zk = get_fake_zk("node1") + node1_zk.create("/test_restart", b"aaaa") + + node3.restart_clickhouse() + + assert node3.contains_in_log("Connected to ZooKeeper (or Keeper) before internal Keeper start")