diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 2309c831839..51ff0f96e6d 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -40,7 +40,7 @@ public: UInt64 getCurrentInitializationDurationMs() const; private: bool initializeMainThread() override; - void initializeReplication(); + void initializeReplication() override; void initializeLogPointer(const String & processed_entry_name); DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 697fd0f406b..408561b7606 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -1,48 +1,47 @@ -#include -#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include #include +#include +#include #include #include #include #include -#include -#include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include + +#include +#include #include + #include #include #include -#include -#include - -#include namespace fs = std::filesystem; - namespace CurrentMetrics { extern const Metric DDLWorkerThreads; @@ -102,6 +101,12 @@ DDLWorker::DDLWorker( if (queue_dir.back() == '/') queue_dir.resize(queue_dir.size() - 1); + // replicas_dir is at the same level as queue_dir + // E.g: + // queue_dir: /clickhouse/task_queue/ddl + // replicas_dir: /clickhouse/task_queue/replicas + replicas_dir = fs::path(queue_dir).parent_path() / "replicas"; + if (config) { task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast(task_max_lifetime)); @@ -1059,6 +1064,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) String query_path_prefix = fs::path(queue_dir) / "query-"; zookeeper->createAncestors(query_path_prefix); + NameSet host_ids; + for (const HostID & host : entry.hosts) + host_ids.emplace(host.toString()); + createReplicaDirs(zookeeper, host_ids); + String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); if (max_pushed_entry_metric) { @@ -1098,6 +1108,7 @@ bool DDLWorker::initializeMainThread() { auto zookeeper = getAndSetZooKeeper(); zookeeper->createAncestors(fs::path(queue_dir) / ""); + initializeReplication(); initialized = true; return true; } @@ -1159,6 +1170,7 @@ void DDLWorker::runMainThread() } cleanup_event->set(); + markReplicasActive(reinitialized); scheduleTasks(reinitialized); subsequent_errors_count = 0; @@ -1216,6 +1228,97 @@ void DDLWorker::runMainThread() } +void DDLWorker::initializeReplication() +{ + auto zookeeper = getAndSetZooKeeper(); + + zookeeper->createAncestors(replicas_dir / ""); + + NameSet host_id_set; + for (const auto & it : context->getClusters()) + { + auto cluster = it.second; + for (const auto & host_ids : cluster->getHostIDs()) + for (const auto & host_id : host_ids) + host_id_set.emplace(host_id); + } + + createReplicaDirs(zookeeper, host_id_set); +} + +void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids) +{ + for (const auto & host_id : host_ids) + zookeeper->createAncestors(replicas_dir / host_id / ""); +} + +void DDLWorker::markReplicasActive(bool reinitialized) +{ + auto zookeeper = getAndSetZooKeeper(); + + if (reinitialized) + { + // Reset all active_node_holders + for (auto & it : active_node_holders) + { + auto & active_node_holder = it.second.second; + if (active_node_holder) + active_node_holder->setAlreadyRemoved(); + active_node_holder.reset(); + } + + active_node_holders.clear(); + } + + const auto maybe_secure_port = context->getTCPPortSecure(); + const auto port = context->getTCPPort(); + + Coordination::Stat replicas_stat; + Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat); + NameSet local_host_ids; + for (const auto & host_id : host_ids) + { + if (active_node_holders.contains(host_id)) + continue; + + try + { + HostID host = HostID::fromString(host_id); + /// The port is considered local if it matches TCP or TCP secure port that the server is listening. + bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port); + + if (is_local_host) + local_host_ids.emplace(host_id); + } + catch (const Exception & e) + { + LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText()); + continue; + } + } + + for (const auto & host_id : local_host_ids) + { + auto it = active_node_holders.find(host_id); + if (it != active_node_holders.end()) + { + continue; + } + + /// Create "active" node (remove previous one if necessary) + String active_path = replicas_dir / host_id / "active"; + String active_id = toString(ServerUUID::get()); + zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id); + + LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id); + + zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral); + auto active_node_holder_zookeeper = zookeeper; + auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); + active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder}; + } +} + void DDLWorker::runCleanupThread() { setThreadName("DDLWorkerClnr"); diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 6d1dabda54f..fd4735b5baa 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -1,22 +1,23 @@ #pragma once -#include +#include +#include +#include #include +#include #include #include #include -#include -#include -#include +#include #include -#include -#include +#include #include #include -#include #include +namespace fs = std::filesystem; + namespace zkutil { class ZooKeeper; @@ -146,6 +147,10 @@ protected: /// Return false if the worker was stopped (stop_flag = true) virtual bool initializeMainThread(); + virtual void initializeReplication(); + + void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids); + void markReplicasActive(bool reinitialized); void runMainThread(); void runCleanupThread(); @@ -157,7 +162,8 @@ protected: std::string host_fqdn; /// current host domain name std::string host_fqdn_id; /// host_name:port - std::string queue_dir; /// dir with queue of queries + std::string queue_dir; /// dir with queue of queries + fs::path replicas_dir; mutable std::mutex zookeeper_mutex; ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex); @@ -199,6 +205,8 @@ protected: const CurrentMetrics::Metric * max_entry_metric; const CurrentMetrics::Metric * max_pushed_entry_metric; + + std::unordered_map> active_node_holders; }; diff --git a/tests/integration/test_ddl_worker_replicas/__init__.py b/tests/integration/test_ddl_worker_replicas/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_ddl_worker_replicas/configs/remote_servers.xml b/tests/integration/test_ddl_worker_replicas/configs/remote_servers.xml new file mode 100644 index 00000000000..c505345cf7f --- /dev/null +++ b/tests/integration/test_ddl_worker_replicas/configs/remote_servers.xml @@ -0,0 +1,30 @@ + + + + + true + + node1 + 9000 + + + node2 + 9000 + + + + true + + node3 + 9000 + + + node4 + 9000 + + + + + + 1 + diff --git a/tests/integration/test_ddl_worker_replicas/test.py b/tests/integration/test_ddl_worker_replicas/test.py new file mode 100644 index 00000000000..f9ce2575e00 --- /dev/null +++ b/tests/integration/test_ddl_worker_replicas/test.py @@ -0,0 +1,67 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + main_configs=["configs/remote_servers.xml"], + with_zookeeper=True, + stay_alive=True, +) +node2 = cluster.add_instance( + "node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True +) +node3 = cluster.add_instance( + "node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True +) +node4 = cluster.add_instance( + "node4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_ddl_worker_replicas(started_cluster): + replica_list = node1.query( + "SELECT name FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas'" + ).strip() + + replica_list = list(replica_list.split("\n")) + expected_replicas = ["node1:9000", "node2:9000", "node3:9000", "node4:9000"] + assert expected_replicas.sort() == replica_list.sort() + + for replica in replica_list: + result = node1.query( + f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'" + ).strip() + + lines = list(result.split("\n")) + assert len(lines) == 1 + + parts = list(lines[0].split("\t")) + assert len(parts) == 3 + assert parts[0] == "active" + assert len(parts[1]) != 0 + assert len(parts[2]) != 0 + + node4.stop() + time.sleep(1) + + result = node1.query( + f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'" + ).strip() + + lines = list(result.split("\n")) + assert len(lines) == 1 + assert len(lines[0]) == 0