diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 979c0b2a8b0..17fa2180e95 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -49,23 +49,11 @@ ClusterDiscovery::ClusterDiscovery( } } -Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes) -{ - Strings result; - result.reserve(nodes.size()); - for (const auto & node : nodes) - { - /// TODO (vdimir@): use batch request? - if (bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back()); !ok) - { - result.pop_back(); - LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node"); - } - } - return result; -} - -Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name) +Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, + const String & zk_root, + const String & cluster_name, + int * version, + bool set_callback) { auto watch_callback = [cluster_name, queue=queue, log=log](const Coordination::WatchResponse &) { @@ -77,20 +65,48 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & } }; - return zk->getChildrenWatch(getReplicasListPath(zk_root), nullptr, watch_callback); + Coordination::Stat stat; + Strings nodes = zk->getChildrenWatch(getReplicasListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{}); + if (version) + *version = stat.cversion; + return nodes; } -void ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root) +Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes) +{ + Strings result; + result.reserve(nodes.size()); + for (const auto & node : nodes) + { + bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back()); + if (!ok) + { + result.pop_back(); + LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node"); + } + } + return result; +} + +bool ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root) { LOG_TRACE(log, "Updating cluster '{}'", cluster_name); auto zk = context->getZooKeeper(); - Strings nodes = getNodeNames(zk, zk_root, cluster_name); + int start_version; + Strings nodes = getNodeNames(zk, zk_root, cluster_name, &start_version, false); + Strings replicas = getNodes(zk, zk_root, nodes); if (replicas.empty()) - return; + return false; + + int current_version; + getNodeNames(zk, zk_root, cluster_name, ¤t_version, true); + + if (current_version != start_version) + return false; std::vector> shards = {replicas}; @@ -107,11 +123,12 @@ void ClusterDiscovery::updateCluster(const String & cluster_name, const String & secure); context->setCluster(cluster_name, cluster); + return true; } -void ClusterDiscovery::updateCluster(const String & cluster_name) +bool ClusterDiscovery::updateCluster(const String & cluster_name) { - updateCluster(cluster_name, clusters[cluster_name]); + return updateCluster(cluster_name, clusters[cluster_name]); } void ClusterDiscovery::start() @@ -143,9 +160,13 @@ void ClusterDiscovery::runMainThread() while (!stop_flag) { std::string cluster_name; - if (bool ok = queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS)) + if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS)) { - updateCluster(cluster_name); + bool ok = updateCluster(cluster_name); + if (!ok) + { + LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name); + } } } LOG_TRACE(log, "Worker thread stopped"); diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index c5545b6bcbe..4e7672fc9ee 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -27,11 +27,16 @@ public: ~ClusterDiscovery(); private: - Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name); + Strings getNodeNames(zkutil::ZooKeeperPtr & zk, + const String & zk_root, + const String & cluster_name, + int * version = nullptr, + bool set_callback = true); + Strings getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes); - void updateCluster(const String & cluster_name); - void updateCluster(const String & cluster_name, const String & zk_root); + bool updateCluster(const String & cluster_name); + bool updateCluster(const String & cluster_name, const String & zk_root); void runMainThread(); void shutdown();