mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #31442 from vdimir/cluster-discovery
This commit is contained in:
commit
bb6fc853e3
@ -46,7 +46,7 @@ void ClusterCopier::init()
|
||||
reloadTaskDescription();
|
||||
|
||||
task_cluster->loadTasks(*task_cluster_current_config);
|
||||
getContext()->setClustersConfig(task_cluster_current_config, task_cluster->clusters_prefix);
|
||||
getContext()->setClustersConfig(task_cluster_current_config, false, task_cluster->clusters_prefix);
|
||||
|
||||
/// Set up shards and their priority
|
||||
task_cluster->random_engine.seed(task_cluster->random_device());
|
||||
|
@ -906,7 +906,7 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
// in a lot of places. For now, disable updating log configuration without server restart.
|
||||
//setTextLog(global_context->getTextLog());
|
||||
updateLevels(*config, logger());
|
||||
global_context->setClustersConfig(config);
|
||||
global_context->setClustersConfig(config, has_zookeeper);
|
||||
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
|
||||
global_context->setExternalAuthenticatorsConfig(*config);
|
||||
|
||||
@ -1422,6 +1422,15 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
LOG_INFO(log, "Ready for connections.");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
global_context->startClusterDiscovery();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "Caught exception while starting cluster discovery");
|
||||
}
|
||||
|
||||
SCOPE_EXIT_SAFE({
|
||||
LOG_DEBUG(log, "Received termination signal.");
|
||||
LOG_DEBUG(log, "Waiting for current connections to close.");
|
||||
|
@ -320,13 +320,29 @@ void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & new_conf
|
||||
if (old_config)
|
||||
{
|
||||
for (const auto & key : deleted_keys)
|
||||
impl.erase(key);
|
||||
{
|
||||
if (!automatic_clusters.contains(key))
|
||||
impl.erase(key);
|
||||
}
|
||||
}
|
||||
else
|
||||
impl.clear();
|
||||
{
|
||||
if (!automatic_clusters.empty())
|
||||
std::erase_if(impl, [this](const auto & e) { return automatic_clusters.contains(e.first); });
|
||||
else
|
||||
impl.clear();
|
||||
}
|
||||
|
||||
|
||||
for (const auto & key : new_config_keys)
|
||||
{
|
||||
if (new_config.has(config_prefix + "." + key + ".discovery"))
|
||||
{
|
||||
/// Handled in ClusterDiscovery
|
||||
automatic_clusters.insert(key);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (key.find('.') != String::npos)
|
||||
throw Exception("Cluster names with dots are not supported: '" + key + "'", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Poco/Net/SocketAddress.h>
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
@ -295,12 +297,15 @@ public:
|
||||
|
||||
void updateClusters(const Poco::Util::AbstractConfiguration & new_config, const Settings & settings, const String & config_prefix, Poco::Util::AbstractConfiguration * old_config = nullptr);
|
||||
|
||||
public:
|
||||
using Impl = std::map<String, ClusterPtr>;
|
||||
|
||||
Impl getContainer() const;
|
||||
|
||||
protected:
|
||||
|
||||
/// setup outside of this class, stored to prevent deleting from impl on config update
|
||||
std::unordered_set<std::string> automatic_clusters;
|
||||
|
||||
Impl impl;
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
479
src/Interpreters/ClusterDiscovery.cpp
Normal file
479
src/Interpreters/ClusterDiscovery.cpp
Normal file
@ -0,0 +1,479 @@
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <Core/ServerUUID.h>
|
||||
|
||||
#include <Interpreters/Cluster.h>
|
||||
#include <Interpreters/ClusterDiscovery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Poco/Exception.h>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
fs::path getShardsListPath(const String & zk_root)
|
||||
{
|
||||
return fs::path(zk_root + "/shards");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Holds boolean flags for fixed set of keys.
|
||||
* Flags can be concurrently set from different threads, and consumer can wait for it.
|
||||
*/
|
||||
template <typename T>
|
||||
class ClusterDiscovery::ConcurrentFlags
|
||||
{
|
||||
public:
|
||||
template <typename It>
|
||||
ConcurrentFlags(It begin, It end)
|
||||
{
|
||||
for (auto it = begin; it != end; ++it)
|
||||
flags.emplace(*it, false);
|
||||
}
|
||||
|
||||
void set(const T & key)
|
||||
{
|
||||
auto it = flags.find(key);
|
||||
if (it == flags.end())
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown value '{}'", key);
|
||||
it->second = true;
|
||||
any_need_update = true;
|
||||
cv.notify_one();
|
||||
}
|
||||
|
||||
/// waits unit at least one flag is set
|
||||
/// caller should handle all set flags (or set it again manually)
|
||||
/// note: keys of returen map should not be changed!
|
||||
/// @param finished - output parameter indicates that stop() was called
|
||||
std::unordered_map<T, std::atomic_bool> & wait(std::chrono::milliseconds timeout, bool & finished)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mu);
|
||||
cv.wait_for(lk, timeout, [this]() -> bool { return any_need_update || stop_flag; });
|
||||
finished = stop_flag;
|
||||
|
||||
/// all set flags expected to be handled by caller
|
||||
any_need_update = false;
|
||||
return flags;
|
||||
}
|
||||
|
||||
void stop()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mu);
|
||||
stop_flag = true;
|
||||
cv.notify_one();
|
||||
}
|
||||
|
||||
private:
|
||||
std::condition_variable cv;
|
||||
std::mutex mu;
|
||||
|
||||
/// flag indicates that update is required
|
||||
std::unordered_map<T, std::atomic_bool> flags;
|
||||
std::atomic_bool any_need_update = true;
|
||||
bool stop_flag = false;
|
||||
};
|
||||
|
||||
ClusterDiscovery::ClusterDiscovery(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
ContextPtr context_,
|
||||
const String & config_prefix)
|
||||
: context(Context::createCopy(context_))
|
||||
, current_node_name(toString(ServerUUID::get()))
|
||||
, log(&Poco::Logger::get("ClusterDiscovery"))
|
||||
{
|
||||
LOG_DEBUG(log, "Cluster discovery is enabled");
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
config.keys(config_prefix, config_keys);
|
||||
|
||||
for (const auto & key : config_keys)
|
||||
{
|
||||
String prefix = config_prefix + "." + key + ".discovery";
|
||||
if (!config.has(prefix))
|
||||
continue;
|
||||
|
||||
clusters_info.emplace(
|
||||
key,
|
||||
ClusterInfo(
|
||||
/* name_= */ key,
|
||||
/* zk_root_= */ config.getString(prefix + ".path"),
|
||||
/* port= */ context->getTCPPort(),
|
||||
/* secure= */ config.getBool(prefix + ".secure", false),
|
||||
/* shard_id= */ config.getUInt(prefix + ".shard", 0)
|
||||
)
|
||||
);
|
||||
}
|
||||
clusters_to_update = std::make_shared<UpdateFlags>(config_keys.begin(), config_keys.end());
|
||||
}
|
||||
|
||||
/// List node in zookeper for cluster
|
||||
Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
const String & zk_root,
|
||||
const String & cluster_name,
|
||||
int * version,
|
||||
bool set_callback)
|
||||
{
|
||||
auto watch_callback = [cluster_name, clusters_to_update=clusters_to_update](auto) { clusters_to_update->set(cluster_name); };
|
||||
|
||||
Coordination::Stat stat;
|
||||
Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{});
|
||||
if (version)
|
||||
*version = stat.cversion;
|
||||
return nodes;
|
||||
}
|
||||
|
||||
/// Reads node information from specified zookeeper nodes
|
||||
/// On error returns empty result
|
||||
ClusterDiscovery::NodesInfo ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids)
|
||||
{
|
||||
NodesInfo result;
|
||||
for (const auto & node_uuid : node_uuids)
|
||||
{
|
||||
String payload;
|
||||
bool ok = zk->tryGet(getShardsListPath(zk_root) / node_uuid, payload) &&
|
||||
NodeInfo::parse(payload, result[node_uuid]);
|
||||
if (!ok)
|
||||
{
|
||||
LOG_WARNING(log, "Can't get data from node '{}' in '{}'", node_uuid, zk_root);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Checks if cluster nodes set is changed.
|
||||
/// Returns true if update required.
|
||||
/// It performs only shallow check (set of nodes' uuids).
|
||||
/// So, if node's hostname are changed, then cluster won't be updated.
|
||||
bool ClusterDiscovery::needUpdate(const Strings & node_uuids, const NodesInfo & nodes)
|
||||
{
|
||||
bool has_difference = node_uuids.size() != nodes.size() ||
|
||||
std::any_of(node_uuids.begin(), node_uuids.end(), [&nodes] (auto u) { return !nodes.contains(u); });
|
||||
{
|
||||
/// Just to log updated nodes, suboptimal, but should be ok for expected update sizes
|
||||
std::set<String> new_names(node_uuids.begin(), node_uuids.end());
|
||||
std::set<String> old_names;
|
||||
for (const auto & [name, _] : nodes)
|
||||
old_names.emplace(name);
|
||||
|
||||
auto format_cluster_update = [](const std::set<String> & s1, const std::set<String> & s2)
|
||||
{
|
||||
std::vector<String> diff;
|
||||
std::set_difference(s1.begin(), s1.end(), s2.begin(), s2.end(), std::back_inserter(diff));
|
||||
|
||||
constexpr size_t max_to_show = 3;
|
||||
size_t sz = diff.size();
|
||||
bool need_crop = sz > max_to_show;
|
||||
if (need_crop)
|
||||
diff.resize(max_to_show);
|
||||
|
||||
if (sz == 0)
|
||||
return fmt::format("{} nodes", sz);
|
||||
return fmt::format("{} node{} [{}{}]", sz, sz != 1 ? "s" : "", fmt::join(diff, ", "), need_crop ? ",..." : "");
|
||||
};
|
||||
|
||||
LOG_DEBUG(log, "Cluster update: added {}, removed {}",
|
||||
format_cluster_update(new_names, old_names),
|
||||
format_cluster_update(old_names, new_names));
|
||||
}
|
||||
return has_difference;
|
||||
}
|
||||
|
||||
ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
|
||||
{
|
||||
std::vector<std::vector<String>> shards;
|
||||
{
|
||||
std::map<size_t, Strings> replica_adresses;
|
||||
|
||||
for (const auto & [_, node] : cluster_info.nodes_info)
|
||||
{
|
||||
if (cluster_info.current_node.secure != node.secure)
|
||||
{
|
||||
LOG_WARNING(log, "Node '{}' in cluster '{}' has different 'secure' value, skipping it", node.address, cluster_info.name);
|
||||
continue;
|
||||
}
|
||||
replica_adresses[node.shard_id].emplace_back(node.address);
|
||||
}
|
||||
|
||||
shards.reserve(replica_adresses.size());
|
||||
for (auto & [_, replicas] : replica_adresses)
|
||||
shards.emplace_back(std::move(replicas));
|
||||
}
|
||||
|
||||
bool secure = cluster_info.current_node.secure;
|
||||
auto cluster = std::make_shared<Cluster>(
|
||||
context->getSettings(),
|
||||
shards,
|
||||
/* username= */ context->getUserName(),
|
||||
/* password= */ "",
|
||||
/* clickhouse_port= */ secure ? context->getTCPPortSecure().value_or(DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort(),
|
||||
/* treat_local_as_remote= */ false,
|
||||
/* treat_local_port_as_remote= */ context->getApplicationType() == Context::ApplicationType::LOCAL,
|
||||
/* secure= */ secure);
|
||||
return cluster;
|
||||
}
|
||||
|
||||
/// Reads data from zookeeper and tries to update cluster.
|
||||
/// Returns true on success (or no update required).
|
||||
bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info)
|
||||
{
|
||||
LOG_DEBUG(log, "Updating cluster '{}'", cluster_info.name);
|
||||
|
||||
auto zk = context->getZooKeeper();
|
||||
|
||||
int start_version;
|
||||
Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false);
|
||||
auto & nodes_info = cluster_info.nodes_info;
|
||||
|
||||
if (std::find(node_uuids.begin(), node_uuids.end(), current_node_name) == node_uuids.end())
|
||||
{
|
||||
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
|
||||
registerInZk(zk, cluster_info);
|
||||
nodes_info.clear();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!needUpdate(node_uuids, nodes_info))
|
||||
{
|
||||
LOG_DEBUG(log, "No update required for cluster '{}'", cluster_info.name);
|
||||
return true;
|
||||
}
|
||||
|
||||
nodes_info = getNodes(zk, cluster_info.zk_root, node_uuids);
|
||||
if (nodes_info.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Can't get nodes info for '{}'", cluster_info.name);
|
||||
return false;
|
||||
}
|
||||
|
||||
int current_version;
|
||||
getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true);
|
||||
|
||||
if (current_version != start_version)
|
||||
{
|
||||
LOG_DEBUG(log, "Cluster '{}' configuration changed during update", cluster_info.name);
|
||||
nodes_info.clear();
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Updating system.clusters record for '{}' with {} nodes", cluster_info.name, cluster_info.nodes_info.size());
|
||||
|
||||
auto cluster = makeCluster(cluster_info);
|
||||
context->setCluster(cluster_info.name, cluster);
|
||||
return true;
|
||||
}
|
||||
|
||||
void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
|
||||
{
|
||||
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
|
||||
|
||||
String node_path = getShardsListPath(info.zk_root) / current_node_name;
|
||||
zk->createAncestors(node_path);
|
||||
|
||||
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
|
||||
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
|
||||
}
|
||||
|
||||
void ClusterDiscovery::initialUpdate()
|
||||
{
|
||||
auto zk = context->getZooKeeper();
|
||||
for (auto & [_, info] : clusters_info)
|
||||
{
|
||||
registerInZk(zk, info);
|
||||
if (!updateCluster(info))
|
||||
{
|
||||
LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name);
|
||||
clusters_to_update->set(info.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterDiscovery::start()
|
||||
{
|
||||
if (clusters_info.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "No defined clusters for discovery");
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
initialUpdate();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "Caught exception in cluster discovery initialization");
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
constexpr static std::chrono::milliseconds DEFAULT_BACKOFF_TIMEOUT = 10ms;
|
||||
|
||||
LOG_DEBUG(log, "Starting working thread");
|
||||
main_thread = ThreadFromGlobalPool([this]
|
||||
{
|
||||
std::chrono::milliseconds backoff_timeout = DEFAULT_BACKOFF_TIMEOUT;
|
||||
|
||||
bool finish = false;
|
||||
while (!finish)
|
||||
{
|
||||
try
|
||||
{
|
||||
finish = runMainThread([&backoff_timeout] { backoff_timeout = DEFAULT_BACKOFF_TIMEOUT; });
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/*
|
||||
* it can be zk error (will take new session) or other retriable error,
|
||||
* should not stop discovery forever
|
||||
*/
|
||||
tryLogCurrentException(log, "Caught exception in cluster discovery runMainThread");
|
||||
}
|
||||
std::this_thread::sleep_for(backoff_timeout);
|
||||
backoff_timeout = std::min(backoff_timeout * 2, std::chrono::milliseconds(3min));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns `true` on graceful shutdown (no restart required)
|
||||
bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
|
||||
{
|
||||
setThreadName("ClusterDiscover");
|
||||
LOG_DEBUG(log, "Worker thread started");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
constexpr auto force_update_interval = 2min;
|
||||
bool finished = false;
|
||||
while (!finished)
|
||||
{
|
||||
bool all_up_to_date = true;
|
||||
auto & clusters = clusters_to_update->wait(5s, finished);
|
||||
for (auto & [cluster_name, need_update] : clusters)
|
||||
{
|
||||
auto cluster_info_it = clusters_info.find(cluster_name);
|
||||
if (cluster_info_it == clusters_info.end())
|
||||
{
|
||||
LOG_ERROR(log, "Unknown cluster '{}'", cluster_name);
|
||||
continue;
|
||||
}
|
||||
auto & cluster_info = cluster_info_it->second;
|
||||
|
||||
if (!need_update.exchange(false))
|
||||
{
|
||||
/// force updating periodically
|
||||
bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count();
|
||||
if (!force_update)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (updateCluster(cluster_info))
|
||||
{
|
||||
cluster_info.watch.restart();
|
||||
LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
all_up_to_date = false;
|
||||
/// no need to trigger convar, will retry after timeout in `wait`
|
||||
need_update = true;
|
||||
LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name);
|
||||
}
|
||||
}
|
||||
|
||||
if (all_up_to_date)
|
||||
{
|
||||
up_to_date_callback();
|
||||
}
|
||||
}
|
||||
LOG_DEBUG(log, "Worker thread stopped");
|
||||
return finished;
|
||||
}
|
||||
|
||||
void ClusterDiscovery::shutdown()
|
||||
{
|
||||
LOG_DEBUG(log, "Shutting down");
|
||||
clusters_to_update->stop();
|
||||
|
||||
if (main_thread.joinable())
|
||||
main_thread.join();
|
||||
}
|
||||
|
||||
ClusterDiscovery::~ClusterDiscovery()
|
||||
{
|
||||
ClusterDiscovery::shutdown();
|
||||
}
|
||||
|
||||
bool ClusterDiscovery::NodeInfo::parse(const String & data, NodeInfo & result)
|
||||
{
|
||||
try
|
||||
{
|
||||
Poco::JSON::Parser parser;
|
||||
auto json = parser.parse(data).extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
size_t ver = json->optValue<size_t>("version", data_ver);
|
||||
if (ver == data_ver)
|
||||
{
|
||||
result.address = json->getValue<std::string>("address");
|
||||
result.secure = json->optValue<bool>("secure", false);
|
||||
result.shard_id = json->optValue<size_t>("shard_id", 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERROR(
|
||||
&Poco::Logger::get("ClusterDiscovery"),
|
||||
"Unsupported version '{}' of data in zk node '{}'",
|
||||
ver, data.size() < 1024 ? data : "[data too long]");
|
||||
}
|
||||
}
|
||||
catch (Poco::Exception & e)
|
||||
{
|
||||
LOG_WARNING(
|
||||
&Poco::Logger::get("ClusterDiscovery"),
|
||||
"Can't parse '{}' from node: {}",
|
||||
data.size() < 1024 ? data : "[data too long]", e.displayText());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
String ClusterDiscovery::NodeInfo::serialize() const
|
||||
{
|
||||
Poco::JSON::Object json;
|
||||
json.set("version", data_ver);
|
||||
json.set("address", address);
|
||||
json.set("shard_id", shard_id);
|
||||
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
oss.exceptions(std::ios::failbit);
|
||||
Poco::JSON::Stringifier::stringify(json, oss);
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
}
|
124
src/Interpreters/ClusterDiscovery.h
Normal file
124
src/Interpreters/ClusterDiscovery.h
Normal file
@ -0,0 +1,124 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include <Interpreters/Cluster.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
#include <base/defines.h>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/*
|
||||
* Discover cluster nodes.
|
||||
*
|
||||
* Each node adds ephemernal node into specified path in zookeeper (each cluster have own path).
|
||||
* Also node subscribed for updates for these paths, and at each child node chanhe cluster updated.
|
||||
* When node goes down ephemernal node are destroyed, cluster configuration is updated on other node and gone node is removed from cluster.
|
||||
*/
|
||||
class ClusterDiscovery
|
||||
{
|
||||
|
||||
public:
|
||||
ClusterDiscovery(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
ContextPtr context_,
|
||||
const String & config_prefix = "remote_servers");
|
||||
|
||||
void start();
|
||||
|
||||
~ClusterDiscovery();
|
||||
|
||||
private:
|
||||
struct NodeInfo
|
||||
{
|
||||
/// versioning for format of data stored in zk
|
||||
static constexpr size_t data_ver = 1;
|
||||
|
||||
/// host:port
|
||||
String address;
|
||||
/// is secure tcp port user
|
||||
bool secure = false;
|
||||
/// shard number
|
||||
size_t shard_id = 0;
|
||||
|
||||
NodeInfo() = default;
|
||||
explicit NodeInfo(const String & address_, bool secure_, size_t shard_id_)
|
||||
: address(address_)
|
||||
, secure(secure_)
|
||||
, shard_id(shard_id_)
|
||||
{}
|
||||
|
||||
static bool parse(const String & data, NodeInfo & result);
|
||||
String serialize() const;
|
||||
};
|
||||
|
||||
// node uuid -> address ("host:port")
|
||||
using NodesInfo = std::unordered_map<String, NodeInfo>;
|
||||
|
||||
struct ClusterInfo
|
||||
{
|
||||
const String name;
|
||||
const String zk_root;
|
||||
NodesInfo nodes_info;
|
||||
|
||||
/// Track last update time
|
||||
Stopwatch watch;
|
||||
|
||||
NodeInfo current_node;
|
||||
|
||||
explicit ClusterInfo(const String & name_, const String & zk_root_, UInt16 port, bool secure, size_t shard_id)
|
||||
: name(name_)
|
||||
, zk_root(zk_root_)
|
||||
, current_node(getFQDNOrHostName() + ":" + toString(port), secure, shard_id)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
void initialUpdate();
|
||||
|
||||
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
|
||||
|
||||
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
|
||||
const String & zk_root,
|
||||
const String & cluster_name,
|
||||
int * version = nullptr,
|
||||
bool set_callback = true);
|
||||
|
||||
NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids);
|
||||
|
||||
ClusterPtr makeCluster(const ClusterInfo & cluster_info);
|
||||
|
||||
bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes);
|
||||
bool updateCluster(ClusterInfo & cluster_info);
|
||||
|
||||
bool runMainThread(std::function<void()> up_to_date_callback);
|
||||
void shutdown();
|
||||
|
||||
/// cluster name -> cluster info (zk root, set of nodes)
|
||||
std::unordered_map<String, ClusterInfo> clusters_info;
|
||||
|
||||
ContextMutablePtr context;
|
||||
|
||||
String current_node_name;
|
||||
|
||||
template <typename T> class ConcurrentFlags;
|
||||
using UpdateFlags = ConcurrentFlags<std::string>;
|
||||
|
||||
/// Cluster names to update.
|
||||
/// The `shared_ptr` is used because it's passed to watch callback.
|
||||
/// It prevents accessing to invalid object after ClusterDiscovery is destroyed.
|
||||
std::shared_ptr<UpdateFlags> clusters_to_update;
|
||||
|
||||
ThreadFromGlobalPool main_thread;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -86,6 +86,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
#include <Interpreters/SynonymsExtensions.h>
|
||||
#include <Interpreters/Lemmatizers.h>
|
||||
#include <Interpreters/ClusterDiscovery.h>
|
||||
#include <filesystem>
|
||||
|
||||
|
||||
@ -254,6 +255,7 @@ struct ContextSharedPart
|
||||
std::shared_ptr<Clusters> clusters;
|
||||
ConfigurationPtr clusters_config; /// Stores updated configs
|
||||
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
|
||||
std::unique_ptr<ClusterDiscovery> cluster_discovery;
|
||||
|
||||
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
|
||||
std::map<String, UInt16> server_ports;
|
||||
@ -2195,11 +2197,22 @@ std::shared_ptr<Clusters> Context::getClusters() const
|
||||
return shared->clusters;
|
||||
}
|
||||
|
||||
void Context::startClusterDiscovery()
|
||||
{
|
||||
if (!shared->cluster_discovery)
|
||||
return;
|
||||
shared->cluster_discovery->start();
|
||||
}
|
||||
|
||||
|
||||
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
|
||||
void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
|
||||
void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name)
|
||||
{
|
||||
std::lock_guard lock(shared->clusters_mutex);
|
||||
if (config->getBool("allow_experimental_cluster_discovery", false) && enable_discovery && !shared->cluster_discovery)
|
||||
{
|
||||
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext());
|
||||
}
|
||||
|
||||
/// Do not update clusters if this part of config wasn't changed.
|
||||
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
|
||||
@ -2209,7 +2222,7 @@ void Context::setClustersConfig(const ConfigurationPtr & config, const String &
|
||||
shared->clusters_config = config;
|
||||
|
||||
if (!shared->clusters)
|
||||
shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
|
||||
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, config_name);
|
||||
else
|
||||
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
|
||||
}
|
||||
|
@ -750,7 +750,10 @@ public:
|
||||
std::shared_ptr<Clusters> getClusters() const;
|
||||
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
|
||||
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
|
||||
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
|
||||
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
|
||||
|
||||
void startClusterDiscovery();
|
||||
|
||||
/// Sets custom cluster, but doesn't update configuration
|
||||
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
|
||||
void reloadClusterConfig() const;
|
||||
|
@ -469,7 +469,7 @@ void InterpreterSystemQuery::restoreReplica()
|
||||
{
|
||||
getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id);
|
||||
|
||||
const zkutil::ZooKeeperPtr& zookeeper = getContext()->getZooKeeper();
|
||||
const zkutil::ZooKeeperPtr & zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
if (zookeeper->expired())
|
||||
throw Exception(ErrorCodes::NO_ZOOKEEPER,
|
||||
|
@ -45,7 +45,8 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
|
||||
// get an error when trying to get the info about DB from ZK.
|
||||
// Just ignore these inaccessible databases. A good example of a
|
||||
// failing test is `01526_client_start_and_exit`.
|
||||
try {
|
||||
try
|
||||
{
|
||||
writeCluster(res_columns, {name_and_database.first, replicated->getCluster()});
|
||||
}
|
||||
catch (...)
|
||||
|
23
tests/integration/test_cluster_discovery/config/config.xml
Normal file
23
tests/integration/test_cluster_discovery/config/config.xml
Normal file
@ -0,0 +1,23 @@
|
||||
<clickhouse>
|
||||
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
|
||||
<remote_servers>
|
||||
<test_auto_cluster>
|
||||
<discovery>
|
||||
<path>/clickhouse/discovery/test_auto_cluster</path>
|
||||
</discovery>
|
||||
</test_auto_cluster>
|
||||
<two_shards>
|
||||
<!-- just to check that there's no conflict between automatic and manual clusters -->
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</two_shards>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,24 @@
|
||||
<clickhouse>
|
||||
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
|
||||
<remote_servers>
|
||||
<test_auto_cluster>
|
||||
<discovery>
|
||||
<path>/clickhouse/discovery/test_auto_cluster</path>
|
||||
<shard>1</shard>
|
||||
</discovery>
|
||||
</test_auto_cluster>
|
||||
<two_shards>
|
||||
<!-- just to check that there's no conflict between automatic and manual clusters -->
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</two_shards>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,24 @@
|
||||
<clickhouse>
|
||||
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
|
||||
<remote_servers>
|
||||
<test_auto_cluster>
|
||||
<discovery>
|
||||
<path>/clickhouse/discovery/test_auto_cluster</path>
|
||||
<shard>3</shard>
|
||||
</discovery>
|
||||
</test_auto_cluster>
|
||||
<two_shards>
|
||||
<!-- just to check that there's no conflict between automatic and manual clusters -->
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</two_shards>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
81
tests/integration/test_cluster_discovery/test.py
Normal file
81
tests/integration/test_cluster_discovery/test.py
Normal file
@ -0,0 +1,81 @@
|
||||
import pytest
|
||||
|
||||
import functools
|
||||
import time
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
shard_configs = {
|
||||
i: f'config/config_shard{i}.xml'
|
||||
for i in [1, 3]
|
||||
}
|
||||
|
||||
nodes = [
|
||||
cluster.add_instance(
|
||||
f'node{i}',
|
||||
main_configs=[shard_configs.get(i, 'config/config.xml')],
|
||||
stay_alive=True,
|
||||
with_zookeeper=True
|
||||
) for i in range(5)
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def check_on_cluster(nodes, expected, *, what, cluster_name='test_auto_cluster', msg=None, retries=5):
|
||||
"""
|
||||
Select data from `system.clusters` on specified nodes and check the result
|
||||
"""
|
||||
assert 1 <= retries <= 6
|
||||
|
||||
for retry in range(1, retries + 1):
|
||||
nodes_res = {
|
||||
node.name: int(node.query(f"SELECT {what} FROM system.clusters WHERE cluster = '{cluster_name}'"))
|
||||
for node in nodes
|
||||
}
|
||||
if all(actual == expected for actual in nodes_res.values()):
|
||||
break
|
||||
|
||||
if retry != retries:
|
||||
time.sleep(2 ** retry)
|
||||
else:
|
||||
msg = msg or f"Wrong '{what}' result"
|
||||
raise Exception(f'{msg}: {nodes_res}, expected: {expected} (after {retries} retries)')
|
||||
|
||||
|
||||
def test_cluster_discovery_startup_and_stop(start_cluster):
|
||||
"""
|
||||
Start cluster, check nodes count in system.clusters,
|
||||
then stop/start some nodes and check that it (dis)appeared in cluster.
|
||||
"""
|
||||
|
||||
check_nodes_count = functools.partial(check_on_cluster, what='count()', msg='Wrong nodes count in cluster')
|
||||
check_shard_num = functools.partial(check_on_cluster, what='count(DISTINCT shard_num)', msg='Wrong shard_num count in cluster')
|
||||
|
||||
total_shards = len(shard_configs) + 1
|
||||
check_nodes_count([nodes[0], nodes[2]], len(nodes))
|
||||
check_shard_num([nodes[0], nodes[2]], total_shards)
|
||||
|
||||
nodes[1].stop_clickhouse(kill=True)
|
||||
check_nodes_count([nodes[0], nodes[2]], len(nodes) - 1)
|
||||
check_shard_num([nodes[0], nodes[2]], total_shards - 1)
|
||||
|
||||
nodes[3].stop_clickhouse()
|
||||
check_nodes_count([nodes[0], nodes[2]], len(nodes) - 2)
|
||||
|
||||
nodes[1].start_clickhouse()
|
||||
check_nodes_count([nodes[0], nodes[2]], len(nodes) - 1)
|
||||
|
||||
nodes[3].start_clickhouse()
|
||||
check_nodes_count([nodes[0], nodes[2]], len(nodes))
|
||||
|
||||
check_nodes_count([nodes[1], nodes[2]], 2, cluster_name='two_shards', retries=1)
|
Loading…
Reference in New Issue
Block a user