mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
cluster discovery: check cluster is changed, keep info
This commit is contained in:
parent
1f460f05fe
commit
4816d1afcc
@ -1,3 +1,7 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
@ -45,10 +49,11 @@ ClusterDiscovery::ClusterDiscovery(
|
||||
{
|
||||
String path = config.getString(config_prefix + "." + key + ".path");
|
||||
trimRight(path, '/');
|
||||
clusters[key] = path;
|
||||
clusters_info.emplace(key, ClusterInfo(key, path));
|
||||
}
|
||||
}
|
||||
|
||||
/// List node in zookeper for cluster
|
||||
Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
const String & zk_root,
|
||||
const String & cluster_name,
|
||||
@ -72,43 +77,42 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
return nodes;
|
||||
}
|
||||
|
||||
Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes)
|
||||
/// Reads node information from scpecified zookeper nodes
|
||||
ClusterDiscovery::NodesInfo ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes)
|
||||
{
|
||||
Strings result;
|
||||
result.reserve(nodes.size());
|
||||
for (const auto & node : nodes)
|
||||
NodesInfo result;
|
||||
for (const auto & node_uuid : nodes)
|
||||
{
|
||||
bool ok = zk->tryGet(getReplicasListPath(zk_root) / node, result.emplace_back());
|
||||
bool ok = zk->tryGet(getReplicasListPath(zk_root) / node_uuid, result[node_uuid]);
|
||||
if (!ok)
|
||||
{
|
||||
result.pop_back();
|
||||
result.erase(node_uuid);
|
||||
LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node");
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root)
|
||||
/// Checks if custer nodes set is changed.
|
||||
/// Returs true if update required.
|
||||
/// It performs only shallow check (set of nodes' uuids).
|
||||
/// So, if node's hostname are changed, then cluster won't be updated.
|
||||
bool ClusterDiscovery::needUpdate(const Strings & node_uuids, const NodesInfo & nodes)
|
||||
{
|
||||
LOG_TRACE(log, "Updating cluster '{}'", cluster_name);
|
||||
bool has_difference = node_uuids.size() != nodes.size() ||
|
||||
std::any_of(node_uuids.begin(), node_uuids.end(), [&nodes] (auto u) { return !nodes.contains(u); });
|
||||
|
||||
auto zk = context->getZooKeeper();
|
||||
return has_difference;
|
||||
}
|
||||
|
||||
int start_version;
|
||||
Strings nodes = getNodeNames(zk, zk_root, cluster_name, &start_version, false);
|
||||
ClusterPtr ClusterDiscovery::getCluster(const ClusterInfo & cluster_info)
|
||||
{
|
||||
Strings replica_adresses;
|
||||
replica_adresses.reserve(cluster_info.nodes_info.size());
|
||||
for (const auto & node : cluster_info.nodes_info)
|
||||
replica_adresses.emplace_back(node.second);
|
||||
|
||||
Strings replicas = getNodes(zk, zk_root, nodes);
|
||||
|
||||
if (replicas.empty())
|
||||
return false;
|
||||
|
||||
int current_version;
|
||||
getNodeNames(zk, zk_root, cluster_name, ¤t_version, true);
|
||||
|
||||
if (current_version != start_version)
|
||||
return false;
|
||||
|
||||
std::vector<std::vector<String>> shards = {replicas};
|
||||
std::vector<std::vector<String>> shards = {replica_adresses};
|
||||
|
||||
bool secure = false;
|
||||
auto maybe_secure_port = context->getTCPPortSecure();
|
||||
@ -121,14 +125,73 @@ bool ClusterDiscovery::updateCluster(const String & cluster_name, const String &
|
||||
false /* treat_local_as_remote */,
|
||||
context->getApplicationType() == Context::ApplicationType::LOCAL /* treat_local_port_as_remote */,
|
||||
secure);
|
||||
return cluster;
|
||||
}
|
||||
|
||||
context->setCluster(cluster_name, cluster);
|
||||
/// Reads data from zookeeper and tries to update cluster.
|
||||
/// Returns true on success (or no update required).
|
||||
bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info)
|
||||
{
|
||||
LOG_TRACE(log, "Updating cluster '{}'", cluster_info.name);
|
||||
|
||||
auto zk = context->getZooKeeper();
|
||||
|
||||
int start_version;
|
||||
Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false);
|
||||
|
||||
if (node_uuids.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Can't find any node in cluster '{}', will register again", cluster_info.name);
|
||||
registerInZk(zk, cluster_info);
|
||||
return false;
|
||||
}
|
||||
|
||||
auto & nodes_info = cluster_info.nodes_info;
|
||||
if (!needUpdate(node_uuids, nodes_info))
|
||||
{
|
||||
LOG_TRACE(log, "No update required for cluster '{}'", cluster_info.name);
|
||||
return true;
|
||||
}
|
||||
|
||||
nodes_info = getNodes(zk, cluster_info.zk_root, node_uuids);
|
||||
|
||||
if (nodes_info.empty())
|
||||
return false;
|
||||
|
||||
int current_version;
|
||||
getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true);
|
||||
|
||||
if (current_version != start_version)
|
||||
{
|
||||
nodes_info.clear();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto cluster = getCluster(cluster_info);
|
||||
context->setCluster(cluster_info.name, cluster);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ClusterDiscovery::updateCluster(const String & cluster_name)
|
||||
{
|
||||
return updateCluster(cluster_name, clusters[cluster_name]);
|
||||
auto cluster_info = clusters_info.find(cluster_name);
|
||||
if (cluster_info == clusters_info.end())
|
||||
{
|
||||
LOG_ERROR(log, "Unknown cluster '{}'", cluster_name);
|
||||
return false;
|
||||
}
|
||||
return updateCluster(cluster_info->second);
|
||||
}
|
||||
|
||||
void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
|
||||
{
|
||||
String node_path = getReplicasListPath(info.zk_root) / node_name;
|
||||
zk->createAncestors(node_path);
|
||||
|
||||
String payload = getFQDNOrHostName() + ":" + toString(server_port);
|
||||
|
||||
zk->createOrUpdate(node_path, payload, zkutil::CreateMode::Ephemeral);
|
||||
LOG_DEBUG(log, "Current node {} registered in cluster {}", node_name, info.name);
|
||||
}
|
||||
|
||||
void ClusterDiscovery::start()
|
||||
@ -138,36 +201,59 @@ void ClusterDiscovery::start()
|
||||
LOG_TRACE(log, "Starting working thread");
|
||||
main_thread = ThreadFromGlobalPool([this] { runMainThread(); });
|
||||
|
||||
for (const auto & [cluster_name, zk_root] : clusters)
|
||||
for (auto & [_, info] : clusters_info)
|
||||
{
|
||||
String node_path = getReplicasListPath(zk_root) / node_name;
|
||||
zk->createAncestors(node_path);
|
||||
registerInZk(zk, info);
|
||||
updateCluster(info);
|
||||
|
||||
String info = getFQDNOrHostName() + ":" + toString(server_port);
|
||||
|
||||
zk->createOrUpdate(node_path, info, zkutil::CreateMode::Ephemeral);
|
||||
LOG_DEBUG(log, "Current node {} registered in cluster {}", node_name, cluster_name);
|
||||
|
||||
updateCluster(cluster_name, zk_root);
|
||||
if (!updateCluster(info))
|
||||
LOG_WARNING(log, "Error on updating cluster '{}'", info.name);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterDiscovery::runMainThread()
|
||||
{
|
||||
// setThreadName("ClusterDiscovery");
|
||||
LOG_TRACE(log, "Worker thread started");
|
||||
// setThreadName("ClusterDiscovery");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
constexpr UInt64 full_update_interval = std::chrono::milliseconds(5min).count();
|
||||
|
||||
std::unordered_map<String, Stopwatch> last_cluster_update;
|
||||
for (const auto & [cluster_name, _] : clusters_info)
|
||||
last_cluster_update.emplace(cluster_name, Stopwatch());
|
||||
Stopwatch last_full_update;
|
||||
|
||||
pcg64 rng(randomSeed());
|
||||
|
||||
while (!stop_flag)
|
||||
{
|
||||
std::string cluster_name;
|
||||
if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS))
|
||||
{
|
||||
bool ok = updateCluster(cluster_name);
|
||||
if (!ok)
|
||||
String cluster_name;
|
||||
if (queue->tryPop(cluster_name, QUEUE_OP_TIMEOUT_MS))
|
||||
{
|
||||
LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name);
|
||||
if (updateCluster(cluster_name))
|
||||
last_cluster_update[cluster_name].restart();
|
||||
else
|
||||
LOG_WARNING(log, "Error on updating cluster '{}', configuration changed during update, will retry", cluster_name);
|
||||
}
|
||||
}
|
||||
|
||||
auto jitter = std::uniform_real_distribution<>(1.0, 2.0)(rng);
|
||||
if (last_full_update.elapsedMilliseconds() > UInt64(full_update_interval * jitter))
|
||||
{
|
||||
for (const auto & lastupd : last_cluster_update)
|
||||
{
|
||||
if (lastupd.second.elapsedMilliseconds() > full_update_interval)
|
||||
{
|
||||
if (updateCluster(lastupd.first))
|
||||
last_cluster_update[lastupd.first].restart();
|
||||
else
|
||||
LOG_WARNING(log, "Error on updating cluster '{}'", lastupd.first);
|
||||
}
|
||||
}
|
||||
last_full_update.restart();
|
||||
}
|
||||
}
|
||||
LOG_TRACE(log, "Worker thread stopped");
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
@ -13,6 +14,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/*
|
||||
* Discover cluster nodes.
|
||||
*
|
||||
* Each node adds ephemernal node into specified path in zookeeper (each cluster have own path).
|
||||
* Also node subscribed for updates for theese paths, and at each child node chanhe cluster updated.
|
||||
* When node goes down ephemernal node are destroyed, cluster configuration is updated on other node and gone node is removed from cluster.
|
||||
*/
|
||||
class ClusterDiscovery
|
||||
{
|
||||
|
||||
@ -27,30 +35,52 @@ public:
|
||||
~ClusterDiscovery();
|
||||
|
||||
private:
|
||||
// node uuid -> address ("host:port")
|
||||
using NodesInfo = std::unordered_map<String, String>;
|
||||
|
||||
struct ClusterInfo
|
||||
{
|
||||
const String name;
|
||||
const String zk_root;
|
||||
NodesInfo nodes_info;
|
||||
|
||||
explicit ClusterInfo(const String & name_, const String & zk_root_) : name(name_), zk_root(zk_root_) {}
|
||||
};
|
||||
|
||||
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
|
||||
|
||||
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
const String & zk_root,
|
||||
const String & cluster_name,
|
||||
int * version = nullptr,
|
||||
bool set_callback = true);
|
||||
|
||||
Strings getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes);
|
||||
NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes);
|
||||
|
||||
ClusterPtr getCluster(const ClusterInfo & cluster_info);
|
||||
|
||||
static bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes);
|
||||
bool updateCluster(const String & cluster_name);
|
||||
bool updateCluster(const String & cluster_name, const String & zk_root);
|
||||
bool updateCluster(ClusterInfo & cluster_info);
|
||||
|
||||
void runMainThread();
|
||||
void shutdown();
|
||||
|
||||
/// cluster name -> path in zk
|
||||
std::unordered_map<String, String> clusters;
|
||||
/// cluster name -> cluster info (zk root, set of nodes)
|
||||
std::unordered_map<String, ClusterInfo> clusters_info;
|
||||
|
||||
ContextMutablePtr context;
|
||||
|
||||
String node_name;
|
||||
UInt16 server_port;
|
||||
|
||||
/// Cluster names to update
|
||||
using UpdateQueue = ConcurrentBoundedQueue<std::string>;
|
||||
|
||||
/// shared_ptr is used because it's passed to watch callback
|
||||
/// it prevents accessing to invalid queue after ClusterDiscovery is destroyed
|
||||
std::shared_ptr<UpdateQueue> queue;
|
||||
|
||||
std::atomic<bool> stop_flag = false;
|
||||
ThreadFromGlobalPool main_thread;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user