Merge pull request #21685 from Avogar/reload-cluster-config

Update clusters only if their configs were updated
This commit is contained in:
Anton Popov 2021-03-15 20:09:58 +03:00 committed by GitHub
commit 840417c957
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 314 additions and 8 deletions

View File

@ -26,6 +26,10 @@ bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const P
return isSameConfiguration(left, String(), right, String());
}
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const Poco::Util::AbstractConfiguration & right, const String & key)
{
return isSameConfiguration(left, key, right, key);
}
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key)

View File

@ -13,6 +13,11 @@ namespace DB
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left,
const Poco::Util::AbstractConfiguration & right);
/// Returns true if the specified subview of the two configurations contains the same keys and values.
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left,
const Poco::Util::AbstractConfiguration & right,
const String & key);
/// Returns true if specified subviews of the two configurations contains the same keys and values.
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key);

View File

@ -5,6 +5,7 @@
#include <Common/isLocalAddress.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/parseAddress.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Core/Settings.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -265,20 +266,45 @@ void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Clu
}
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix)
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & new_config, const Settings & settings, const String & config_prefix, Poco::Util::AbstractConfiguration * old_config)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_prefix, config_keys);
Poco::Util::AbstractConfiguration::Keys new_config_keys;
new_config.keys(config_prefix, new_config_keys);
/// If old config is set, we will update only clusters with updated config.
/// In this case, we first need to find clusters that were deleted from config.
Poco::Util::AbstractConfiguration::Keys deleted_keys;
if (old_config)
{
std::sort(new_config_keys.begin(), new_config_keys.end());
Poco::Util::AbstractConfiguration::Keys old_config_keys;
old_config->keys(config_prefix, old_config_keys);
std::sort(old_config_keys.begin(), old_config_keys.end());
std::set_difference(
old_config_keys.begin(), old_config_keys.end(), new_config_keys.begin(), new_config_keys.end(), std::back_inserter(deleted_keys));
}
std::lock_guard lock(mutex);
impl.clear();
for (const auto & key : config_keys)
/// If old congig is set, remove deleted clusters from impl, otherwise just clear it.
if (old_config)
{
for (const auto & key : deleted_keys)
impl.erase(key);
}
else
impl.clear();
for (const auto & key : new_config_keys)
{
if (key.find('.') != String::npos)
throw Exception("Cluster names with dots are not supported: '" + key + "'", ErrorCodes::SYNTAX_ERROR);
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_prefix, key));
/// If old config is set and cluster config wasn't changed, don't update this cluster.
if (!old_config || !isSameConfiguration(new_config, *old_config, config_prefix + "." + key))
impl[key] = std::make_shared<Cluster>(new_config, settings, config_prefix, key);
}
}

View File

@ -276,7 +276,7 @@ public:
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix);
void updateClusters(const Poco::Util::AbstractConfiguration & new_config, const Settings & settings, const String & config_prefix, Poco::Util::AbstractConfiguration * old_config = nullptr);
public:
using Impl = std::map<String, ClusterPtr>;

View File

@ -60,6 +60,7 @@
#include <Parsers/parseQuery.h>
#include <Common/StackTrace.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
@ -1833,12 +1834,17 @@ void Context::setClustersConfig(const ConfigurationPtr & config, const String &
{
std::lock_guard lock(shared->clusters_mutex);
/// Do not update clusters if this part of config wasn't changed.
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
return;
auto old_clusters_config = shared->clusters_config;
shared->clusters_config = config;
if (!shared->clusters)
shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name);
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
}

View File

@ -0,0 +1,30 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster2>
</remote_servers>
</yandex>

View File

@ -0,0 +1,235 @@
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
node_1 = cluster.add_instance('node_1', with_zookeeper=True)
node_2 = cluster.add_instance('node_2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.query('''CREATE TABLE distributed (id UInt32) ENGINE =
Distributed('test_cluster', 'default', 'replicated')''')
node.query('''CREATE TABLE distributed2 (id UInt32) ENGINE =
Distributed('test_cluster2', 'default', 'replicated')''')
cluster.pause_container('node_1')
cluster.pause_container('node_2')
yield cluster
finally:
cluster.shutdown()
base_config = '''
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster2>
</remote_servers>
</yandex>
'''
test_config1 = '''
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster2>
</remote_servers>
</yandex>
'''
test_config2 = '''
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
'''
test_config3 = '''
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster2>
<test_cluster3>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster3>
</remote_servers>
</yandex>
'''
def send_repeated_query(table, count=5):
for i in range(count):
node.query_and_get_error("SELECT count() FROM {} SETTINGS receive_timeout=1".format(table))
def get_errors_count(cluster, host_name="node_1"):
return int(node.query("SELECT errors_count FROM system.clusters WHERE cluster='{}' and host_name='{}'".format(cluster, host_name)))
def set_config(config):
node.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config)
node.query("SYSTEM RELOAD CONFIG")
def test_simple_reload(started_cluster):
send_repeated_query("distributed")
assert get_errors_count("test_cluster") > 0
node.query("SYSTEM RELOAD CONFIG")
assert get_errors_count("test_cluster") > 0
def test_update_one_cluster(started_cluster):
send_repeated_query("distributed")
send_repeated_query("distributed2")
assert get_errors_count("test_cluster") > 0
assert get_errors_count("test_cluster2") > 0
set_config(test_config1)
assert get_errors_count("test_cluster") == 0
assert get_errors_count("test_cluster2") > 0
set_config(base_config)
def test_delete_cluster(started_cluster):
send_repeated_query("distributed")
send_repeated_query("distributed2")
assert get_errors_count("test_cluster") > 0
assert get_errors_count("test_cluster2") > 0
set_config(test_config2)
assert get_errors_count("test_cluster") > 0
result = node.query("SELECT * FROM system.clusters WHERE cluster='test_cluster2'")
assert result == ''
set_config(base_config)
def test_add_cluster(started_cluster):
send_repeated_query("distributed")
send_repeated_query("distributed2")
assert get_errors_count("test_cluster") > 0
assert get_errors_count("test_cluster2") > 0
set_config(test_config3)
assert get_errors_count("test_cluster") > 0
assert get_errors_count("test_cluster2") > 0
result = node.query("SELECT * FROM system.clusters WHERE cluster='test_cluster3'")
assert result != ''
set_config(base_config)