ClickHouse/src/Interpreters/ClusterDiscovery.h

133 lines
3.7 KiB
C++
Raw Normal View History

2021-11-11 09:03:53 +00:00
#pragma once
2021-11-15 14:52:52 +00:00
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
2021-11-11 09:03:53 +00:00
#include <Common/ZooKeeper/Common.h>
#include <base/getFQDNOrHostName.h>
#include <Interpreters/Cluster.h>
2021-11-11 09:03:53 +00:00
2021-11-15 14:52:52 +00:00
#include <Poco/Logger.h>
#include <base/defines.h>
#include <unordered_map>
2021-11-11 09:03:53 +00:00
namespace DB
{
/*
* Discover cluster nodes.
*
* Each node adds ephemernal node into specified path in zookeeper (each cluster have own path).
2021-11-16 13:39:54 +00:00
* Also node subscribed for updates for these paths, and at each child node chanhe cluster updated.
* When node goes down ephemernal node are destroyed, cluster configuration is updated on other node and gone node is removed from cluster.
*/
2021-11-11 09:03:53 +00:00
class ClusterDiscovery
{
public:
ClusterDiscovery(
const Poco::Util::AbstractConfiguration & config,
2021-11-18 08:57:26 +00:00
ContextPtr context_,
const String & config_prefix = "remote_servers");
2021-11-11 09:03:53 +00:00
void start();
2021-11-15 14:52:52 +00:00
~ClusterDiscovery();
2021-11-11 09:03:53 +00:00
private:
2021-11-16 13:39:54 +00:00
struct NodeInfo
{
/// versioning for format of data stored in zk
static constexpr size_t data_ver = 1;
2021-11-16 13:39:54 +00:00
/// host:port
String address;
/// is secure tcp port user
bool secure = false;
/// shard number
size_t shard_id = 0;
2021-11-16 13:39:54 +00:00
NodeInfo() = default;
explicit NodeInfo(const String & address_, bool secure_, size_t shard_id_)
: address(address_)
, secure(secure_)
, shard_id(shard_id_)
{}
static bool parse(const String & data, NodeInfo & result);
String serialize() const;
2021-11-16 13:39:54 +00:00
};
// node uuid -> address ("host:port")
2021-11-16 13:39:54 +00:00
using NodesInfo = std::unordered_map<String, NodeInfo>;
struct ClusterInfo
{
const String name;
const String zk_root;
NodesInfo nodes_info;
/// Track last update time
Stopwatch watch;
NodeInfo current_node;
/// Current node may not belong to cluster, to be just an observer.
bool current_node_is_observer = false;
explicit ClusterInfo(const String & name_,
const String & zk_root_,
UInt16 port,
bool secure,
size_t shard_id,
bool observer_mode)
: name(name_)
, zk_root(zk_root_)
, current_node(getFQDNOrHostName() + ":" + toString(port), secure, shard_id)
, current_node_is_observer(observer_mode)
{
}
};
void initialUpdate();
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
const String & cluster_name,
int * version = nullptr,
bool set_callback = true);
NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids);
2021-11-15 14:52:52 +00:00
ClusterPtr makeCluster(const ClusterInfo & cluster_info);
2021-11-17 13:47:40 +00:00
bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes);
bool updateCluster(ClusterInfo & cluster_info);
2021-11-11 09:03:53 +00:00
bool runMainThread(std::function<void()> up_to_date_callback);
2021-11-15 14:52:52 +00:00
void shutdown();
/// cluster name -> cluster info (zk root, set of nodes)
std::unordered_map<String, ClusterInfo> clusters_info;
2021-11-11 09:03:53 +00:00
ContextMutablePtr context;
2021-11-25 13:45:38 +00:00
String current_node_name;
2021-11-11 09:03:53 +00:00
template <typename T> class ConcurrentFlags;
using UpdateFlags = ConcurrentFlags<std::string>;
/// Cluster names to update.
/// The `shared_ptr` is used because it's passed to watch callback.
/// It prevents accessing to invalid object after ClusterDiscovery is destroyed.
std::shared_ptr<UpdateFlags> clusters_to_update;
2021-11-15 14:52:52 +00:00
ThreadFromGlobalPool main_thread;
2021-11-11 09:03:53 +00:00
Poco::Logger * log;
};
}