cluster discovery wip

This commit is contained in:
vdimir 2021-11-11 12:03:53 +03:00
parent 11ad2b0fca
commit 94bb7cba62
No known key found for this signature in database
GPG Key ID: 9B404D301C0CC7EB
8 changed files with 166 additions and 2 deletions

View File

@ -1555,6 +1555,8 @@ if (ThreadFuzzer::instance().isEffective())
server.start();
LOG_INFO(log, "Ready for connections.");
global_context->registerNodeForClusterDiscovery();
SCOPE_EXIT_SAFE({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");

View File

@ -489,6 +489,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
internal_replication
});
}
else if (startsWith(key, "discovery"))
{
continue;
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);

View File

@ -295,7 +295,6 @@ public:
void updateClusters(const Poco::Util::AbstractConfiguration & new_config, const Settings & settings, const String & config_prefix, Poco::Util::AbstractConfiguration * old_config = nullptr);
public:
using Impl = std::map<String, ClusterPtr>;
Impl getContainer() const;

View File

@ -0,0 +1,103 @@
#include <Common/DNSResolver.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/Types.h>
#include "base/logger_useful.h"
#include <Core/ServerUUID.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ClusterDiscovery.h>
#include <Interpreters/Context.h>
#include <fmt/format.h>
#include <cstddef>
#include <vector>
namespace DB
{
ClusterDiscovery::ClusterDiscovery(
const Poco::Util::AbstractConfiguration & config,
ContextMutablePtr context_,
const String & config_prefix)
: context(context_)
, node_name(toString(ServerUUID::get()))
, server_port(context->getTCPPort())
, log(&Poco::Logger::get("ClusterDiscovery"))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
for (const auto & key : config_keys)
{
String path = config.getString(config_prefix + "." + key + ".path");
trimRight(path, '/');
clusters[key] = path;
}
}
Strings ClusterDiscovery::getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes)
{
Strings result;
result.reserve(nodes.size());
for (const auto & node : nodes)
{
/// TODO (vdimir@): use batch request?
if (bool ok = zk->tryGet(zk_root + "/" + node, result.emplace_back()); !ok)
{
result.pop_back();
LOG_WARNING(log, "Cluster configuration was changed during update, skip nonexisting node");
}
}
return result;
}
void ClusterDiscovery::updateCluster(const String & cluster_name, const String & zk_root)
{
auto zk = context->getZooKeeper();
auto watch_callback = [this, cluster_name, zk_root](const Coordination::WatchResponse &)
{
this->updateCluster(cluster_name, zk_root);
};
const auto nodes = zk->getChildrenWatch(zk_root, nullptr, watch_callback);
Strings replicas = getNodes(zk, zk_root, nodes);
if (replicas.empty())
return;
std::vector<std::vector<String>> shards = {replicas};
bool secure = false;
auto maybe_secure_port = context->getTCPPortSecure();
auto cluster = std::make_shared<Cluster>(
context->getSettings(),
shards,
context->getUserName(),
"",
(secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort()),
false /* treat_local_as_remote */,
context->getApplicationType() == Context::ApplicationType::LOCAL /* treat_local_port_as_remote */,
secure);
context->setCluster(cluster_name, cluster);
}
void ClusterDiscovery::start()
{
auto zk = context->getZooKeeper();
for (const auto & [cluster_name, zk_root] : clusters)
{
String node_path = zk_root + "/" + node_name;
zk->createAncestors(node_path);
String info = DNSResolver::instance().getHostName() + ":" + toString(server_port);
zk->createOrUpdate(node_path, info, zkutil::CreateMode::Ephemeral);
LOG_DEBUG(log, "Current node {} registered in cluster {}", node_name, cluster_name);
this->updateCluster(cluster_name, zk_root);
}
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <unordered_map>
#include <base/defines.h>
#include <Poco/Logger.h>
#include <Common/ZooKeeper/Common.h>
namespace DB
{
class ClusterDiscovery
{
public:
ClusterDiscovery(
const Poco::Util::AbstractConfiguration & config,
ContextMutablePtr context_,
const String & config_prefix = "remote_servers_discovery");
void start();
private:
Strings getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & nodes);
void updateCluster(const String & cluster_name, const String & zk_root);
/// cluster name -> path in zk
std::unordered_map<String, String> clusters;
ContextMutablePtr context;
String node_name;
UInt16 server_port;
Poco::Logger * log;
};
}

View File

@ -86,6 +86,7 @@
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
#include <filesystem>
@ -254,6 +255,7 @@ struct ContextSharedPart
std::shared_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
std::unique_ptr<ClusterDiscovery> cluster_discovery;
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
std::map<String, UInt16> server_ports;
@ -2195,12 +2197,24 @@ std::shared_ptr<Clusters> Context::getClusters() const
return shared->clusters;
}
void Context::registerNodeForClusterDiscovery()
{
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->start();
}
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->cluster_discovery)
{
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, shared_from_this());
}
/// Do not update clusters if this part of config wasn't changed.
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
return;

View File

@ -751,6 +751,9 @@ public:
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
void registerNodeForClusterDiscovery();
/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
void reloadClusterConfig() const;

View File

@ -469,7 +469,7 @@ void InterpreterSystemQuery::restoreReplica()
{
getContext()->checkAccess(AccessType::SYSTEM_RESTORE_REPLICA, table_id);
const zkutil::ZooKeeperPtr& zookeeper = getContext()->getZooKeeper();
const zkutil::ZooKeeperPtr & zookeeper = getContext()->getZooKeeper();
if (zookeeper->expired())
throw Exception(ErrorCodes::NO_ZOOKEEPER,