From 4816d1afcc90f0fe7a59be4d7099bbbd38d733ef Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 16 Nov 2021 15:01:30 +0300 Subject: [PATCH] cluster discovery: check cluster is changed, keep info --- src/Interpreters/ClusterDiscovery.cpp | 168 +++++++++++++++++++------- src/Interpreters/ClusterDiscovery.h | 38 +++++- 2 files changed, 161 insertions(+), 45 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 17fa2180e95..3ffdfebd2f0 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -1,3 +1,7 @@ +#include +#include +#include + #include #include @@ -45,10 +49,11 @@ ClusterDiscovery::ClusterDiscovery( { String path = config.getString(config_prefix + "." + key + ".path"); trimRight(path, '/'); - clusters[key] = path; + clusters_info.emplace(key, ClusterInfo(key, path)); } } +/// List node in zookeper for cluster Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, @@ -72,43 +77,42 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, return nodes; } -Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes) +/// Reads node information from scpecified zookeper nodes +ClusterDiscovery::NodesInfo ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes) { - Strings result; - result.reserve(nodes.size()); - for (const auto & node : nodes) + NodesInfo result; + for (const auto & node_uuid : nodes) { - bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back()); + bool ok = zk->tryGet(getReplicasListPath(zk_root) / node_uuid, result[node_uuid]); if (!ok) { - result.pop_back(); + result.erase(node_uuid); LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node"); } } return result; } -bool ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root) +/// Checks if custer nodes set is changed. +/// Returs true if update required. +/// It performs only shallow check (set of nodes' uuids). +/// So, if node's hostname are changed, then cluster won't be updated. +bool ClusterDiscovery::needUpdate(const Strings & node_uuids, const NodesInfo & nodes) { - LOG_TRACE(log, "Updating cluster '{}'", cluster_name); + bool has_difference = node_uuids.size() != nodes.size() || + std::any_of(node_uuids.begin(), node_uuids.end(), [&nodes] (auto u) { return !nodes.contains(u); }); - auto zk = context->getZooKeeper(); + return has_difference; +} - int start_version; - Strings nodes = getNodeNames(zk, zk_root, cluster_name, &start_version, false); +ClusterPtr ClusterDiscovery::getCluster(const ClusterInfo & cluster_info) +{ + Strings replica_adresses; + replica_adresses.reserve(cluster_info.nodes_info.size()); + for (const auto & node : cluster_info.nodes_info) + replica_adresses.emplace_back(node.second); - Strings replicas = getNodes(zk, zk_root, nodes); - - if (replicas.empty()) - return false; - - int current_version; - getNodeNames(zk, zk_root, cluster_name, ¤t_version, true); - - if (current_version != start_version) - return false; - - std::vector> shards = {replicas}; + std::vector> shards = {replica_adresses}; bool secure = false; auto maybe_secure_port = context->getTCPPortSecure(); @@ -121,14 +125,73 @@ bool ClusterDiscovery::updateCluster(const String & cluster_name, const String & false /* treat_local_as_remote */, context->getApplicationType() == Context::ApplicationType::LOCAL /* treat_local_port_as_remote */, secure); + return cluster; +} - context->setCluster(cluster_name, cluster); +/// Reads data from zookeeper and tries to update cluster. +/// Returns true on success (or no update required). +bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info) +{ + LOG_TRACE(log, "Updating cluster '{}'", cluster_info.name); + + auto zk = context->getZooKeeper(); + + int start_version; + Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false); + + if (node_uuids.empty()) + { + LOG_ERROR(log, "Can't find any node in cluster '{}', will register again", cluster_info.name); + registerInZk(zk, cluster_info); + return false; + } + + auto & nodes_info = cluster_info.nodes_info; + if (!needUpdate(node_uuids, nodes_info)) + { + LOG_TRACE(log, "No update required for cluster '{}'", cluster_info.name); + return true; + } + + nodes_info = getNodes(zk, cluster_info.zk_root, node_uuids); + + if (nodes_info.empty()) + return false; + + int current_version; + getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true); + + if (current_version != start_version) + { + nodes_info.clear(); + return false; + } + + auto cluster = getCluster(cluster_info); + context->setCluster(cluster_info.name, cluster); return true; } bool ClusterDiscovery::updateCluster(const String & cluster_name) { - return updateCluster(cluster_name, clusters[cluster_name]); + auto cluster_info = clusters_info.find(cluster_name); + if (cluster_info == clusters_info.end()) + { + LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); + return false; + } + return updateCluster(cluster_info->second); +} + +void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info) +{ + String node_path = getReplicasListPath(info.zk_root) / node_name; + zk->createAncestors(node_path); + + String payload = getFQDNOrHostName() + ":" + toString(server_port); + + zk->createOrUpdate(node_path, payload, zkutil::CreateMode::Ephemeral); + LOG_DEBUG(log, "Current node {} registered in cluster {}", node_name, info.name); } void ClusterDiscovery::start() @@ -138,36 +201,59 @@ void ClusterDiscovery::start() LOG_TRACE(log, "Starting working thread"); main_thread = ThreadFromGlobalPool([this] { runMainThread(); }); - for (const auto & [cluster_name, zk_root] : clusters) + for (auto & [_, info] : clusters_info) { - String node_path = getReplicasListPath(zk_root) / node_name; - zk->createAncestors(node_path); + registerInZk(zk, info); + updateCluster(info); - String info = getFQDNOrHostName() + ":" + toString(server_port); - - zk->createOrUpdate(node_path, info, zkutil::CreateMode::Ephemeral); - LOG_DEBUG(log, "Current node {} registered in cluster {}", node_name, cluster_name); - - updateCluster(cluster_name, zk_root); + if (!updateCluster(info)) + LOG_WARNING(log, "Error on updating cluster '{}'", info.name); } } void ClusterDiscovery::runMainThread() { - // setThreadName("ClusterDiscovery"); LOG_TRACE(log, "Worker thread started"); + // setThreadName("ClusterDiscovery"); + + using namespace std::chrono_literals; + constexpr UInt64 full_update_interval = std::chrono::milliseconds(5min).count(); + + std::unordered_map last_cluster_update; + for (const auto & [cluster_name, _] : clusters_info) + last_cluster_update.emplace(cluster_name, Stopwatch()); + Stopwatch last_full_update; + + pcg64 rng(randomSeed()); while (!stop_flag) { - std::string cluster_name; - if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS)) { - bool ok = updateCluster(cluster_name); - if (!ok) + String cluster_name; + if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS)) { - LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name); + if (updateCluster(cluster_name)) + last_cluster_update[cluster_name].restart(); + else + LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name); } } + + auto jitter = std::uniform_real_distribution<>(1.0, 2.0)(rng); + if (last_full_update.elapsedMilliseconds() > UInt64(full_update_interval * jitter)) + { + for (const auto & lastupd : last_cluster_update) + { + if (lastupd.second.elapsedMilliseconds() > full_update_interval) + { + if (updateCluster(lastupd.first)) + last_cluster_update[lastupd.first].restart(); + else + LOG_WARNING(log, "Error on updating cluster '{}'", lastupd.first); + } + } + last_full_update.restart(); + } } LOG_TRACE(log, "Worker thread stopped"); } diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 4e7672fc9ee..152a4cbb892 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -13,6 +14,13 @@ namespace DB { +/* + * Discover cluster nodes. + * + * Each node adds ephemernal node into specified path in zookeeper (each cluster have own path). + * Also node subscribed for updates for theese paths, and at each child node chanhe cluster updated. + * When node goes down ephemernal node are destroyed, cluster configuration is updated on other node and gone node is removed from cluster. + */ class ClusterDiscovery { @@ -27,30 +35,52 @@ public: ~ClusterDiscovery(); private: + // node uuid -> address ("host:port") + using NodesInfo = std::unordered_map; + + struct ClusterInfo + { + const String name; + const String zk_root; + NodesInfo nodes_info; + + explicit ClusterInfo(const String & name_, const String & zk_root_) : name(name_), zk_root(zk_root_) {} + }; + + void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info); + Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, int * version = nullptr, bool set_callback = true); - Strings getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes); + NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes); + ClusterPtr getCluster(const ClusterInfo & cluster_info); + + static bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes); bool updateCluster(const String & cluster_name); - bool updateCluster(const String & cluster_name, const String & zk_root); + bool updateCluster(ClusterInfo & cluster_info); void runMainThread(); void shutdown(); - /// cluster name -> path in zk - std::unordered_map clusters; + /// cluster name -> cluster info (zk root, set of nodes) + std::unordered_map clusters_info; ContextMutablePtr context; String node_name; UInt16 server_port; + /// Cluster names to update using UpdateQueue = ConcurrentBoundedQueue; + + /// shared_ptr is used because it's passed to watch callback + /// it prevents accessing to invalid queue after ClusterDiscovery is destroyed std::shared_ptr queue; + std::atomic stop_flag = false; ThreadFromGlobalPool main_thread;