diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 2309c831839..51ff0f96e6d 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -40,7 +40,7 @@ public: UInt64 getCurrentInitializationDurationMs() const; private: bool initializeMainThread() override; - void initializeReplication(); + void initializeReplication() override; void initializeLogPointer(const String & processed_entry_name); DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 697fd0f406b..408561b7606 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -1,48 +1,47 @@ -#include -#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include #include +#include +#include #include #include #include #include -#include -#include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include + +#include +#include #include + #include #include #include -#include -#include - -#include namespace fs = std::filesystem; - namespace CurrentMetrics { extern const Metric DDLWorkerThreads; @@ -102,6 +101,12 @@ DDLWorker::DDLWorker( if (queue_dir.back() == '/') queue_dir.resize(queue_dir.size() - 1); + // replicas_dir is at the same level as queue_dir + // E.g: + // queue_dir: /clickhouse/task_queue/ddl + // replicas_dir: /clickhouse/task_queue/replicas + replicas_dir = fs::path(queue_dir).parent_path() / "replicas"; + if (config) { task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast(task_max_lifetime)); @@ -1059,6 +1064,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) String query_path_prefix = fs::path(queue_dir) / "query-"; zookeeper->createAncestors(query_path_prefix); + NameSet host_ids; + for (const HostID & host : entry.hosts) + host_ids.emplace(host.toString()); + createReplicaDirs(zookeeper, host_ids); + String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); if (max_pushed_entry_metric) { @@ -1098,6 +1108,7 @@ bool DDLWorker::initializeMainThread() { auto zookeeper = getAndSetZooKeeper(); zookeeper->createAncestors(fs::path(queue_dir) / ""); + initializeReplication(); initialized = true; return true; } @@ -1159,6 +1170,7 @@ void DDLWorker::runMainThread() } cleanup_event->set(); + markReplicasActive(reinitialized); scheduleTasks(reinitialized); subsequent_errors_count = 0; @@ -1216,6 +1228,97 @@ void DDLWorker::runMainThread() } +void DDLWorker::initializeReplication() +{ + auto zookeeper = getAndSetZooKeeper(); + + zookeeper->createAncestors(replicas_dir / ""); + + NameSet host_id_set; + for (const auto & it : context->getClusters()) + { + auto cluster = it.second; + for (const auto & host_ids : cluster->getHostIDs()) + for (const auto & host_id : host_ids) + host_id_set.emplace(host_id); + } + + createReplicaDirs(zookeeper, host_id_set); +} + +void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids) +{ + for (const auto & host_id : host_ids) + zookeeper->createAncestors(replicas_dir / host_id / ""); +} + +void DDLWorker::markReplicasActive(bool reinitialized) +{ + auto zookeeper = getAndSetZooKeeper(); + + if (reinitialized) + { + // Reset all active_node_holders + for (auto & it : active_node_holders) + { + auto & active_node_holder = it.second.second; + if (active_node_holder) + active_node_holder->setAlreadyRemoved(); + active_node_holder.reset(); + } + + active_node_holders.clear(); + } + + const auto maybe_secure_port = context->getTCPPortSecure(); + const auto port = context->getTCPPort(); + + Coordination::Stat replicas_stat; + Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat); + NameSet local_host_ids; + for (const auto & host_id : host_ids) + { + if (active_node_holders.contains(host_id)) + continue; + + try + { + HostID host = HostID::fromString(host_id); + /// The port is considered local if it matches TCP or TCP secure port that the server is listening. + bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port); + + if (is_local_host) + local_host_ids.emplace(host_id); + } + catch (const Exception & e) + { + LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText()); + continue; + } + } + + for (const auto & host_id : local_host_ids) + { + auto it = active_node_holders.find(host_id); + if (it != active_node_holders.end()) + { + continue; + } + + /// Create "active" node (remove previous one if necessary) + String active_path = replicas_dir / host_id / "active"; + String active_id = toString(ServerUUID::get()); + zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id); + + LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id); + + zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral); + auto active_node_holder_zookeeper = zookeeper; + auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); + active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder}; + } +} + void DDLWorker::runCleanupThread() { setThreadName("DDLWorkerClnr"); diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 6d1dabda54f..fd4735b5baa 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -1,22 +1,23 @@ #pragma once -#include +#include +#include +#include #include +#include #include #include #include -#include -#include -#include +#include #include -#include -#include +#include #include #include -#include #include +namespace fs = std::filesystem; + namespace zkutil { class ZooKeeper; @@ -146,6 +147,10 @@ protected: /// Return false if the worker was stopped (stop_flag = true) virtual bool initializeMainThread(); + virtual void initializeReplication(); + + void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids); + void markReplicasActive(bool reinitialized); void runMainThread(); void runCleanupThread(); @@ -157,7 +162,8 @@ protected: std::string host_fqdn; /// current host domain name std::string host_fqdn_id; /// host_name:port - std::string queue_dir; /// dir with queue of queries + std::string queue_dir; /// dir with queue of queries + fs::path replicas_dir; mutable std::mutex zookeeper_mutex; ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex); @@ -199,6 +205,8 @@ protected: const CurrentMetrics::Metric * max_entry_metric; const CurrentMetrics::Metric * max_pushed_entry_metric; + + std::unordered_map> active_node_holders; };