Handle cluster macro in s3Cluster table function

This commit is contained in:
Vadim Volodin 2022-04-28 02:32:49 +03:00
parent dd2e085ae5
commit 233f0b4b52
8 changed files with 65 additions and 10 deletions

View File

@ -286,8 +286,9 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
/// Implementation of Clusters class /// Implementation of Clusters class
Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix) Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, MultiVersion<Macros>::Version macros, const String & config_prefix)
{ {
this->macros_ = macros;
updateClusters(config, settings, config_prefix); updateClusters(config, settings, config_prefix);
} }
@ -296,7 +297,8 @@ ClusterPtr Clusters::getCluster(const std::string & cluster_name) const
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = impl.find(cluster_name); auto expanded_cluster_name = macros_->expand(cluster_name);
auto it = impl.find(expanded_cluster_name);
return (it != impl.end()) ? it->second : nullptr; return (it != impl.end()) ? it->second : nullptr;
} }

View File

@ -2,6 +2,8 @@
#include <Client/ConnectionPool.h> #include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h> #include <Client/ConnectionPoolWithFailover.h>
#include <Common/Macros.h>
#include <Common/MultiVersion.h>
#include <Poco/Net/SocketAddress.h> #include <Poco/Net/SocketAddress.h>
@ -287,7 +289,7 @@ using ClusterPtr = std::shared_ptr<Cluster>;
class Clusters class Clusters
{ {
public: public:
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix = "remote_servers"); Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, MultiVersion<Macros>::Version macros, const String & config_prefix = "remote_servers");
Clusters(const Clusters &) = delete; Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete; Clusters & operator=(const Clusters &) = delete;
@ -306,6 +308,8 @@ protected:
/// setup outside of this class, stored to prevent deleting from impl on config update /// setup outside of this class, stored to prevent deleting from impl on config update
std::unordered_set<std::string> automatic_clusters; std::unordered_set<std::string> automatic_clusters;
MultiVersion<Macros>::Version macros_;
Impl impl; Impl impl;
mutable std::mutex mutex; mutable std::mutex mutex;
}; };

View File

@ -2265,7 +2265,7 @@ void Context::reloadClusterConfig() const
} }
const auto & config = cluster_config ? *cluster_config : getConfigRef(); const auto & config = cluster_config ? *cluster_config : getConfigRef();
auto new_clusters = std::make_shared<Clusters>(config, settings); auto new_clusters = std::make_shared<Clusters>(config, settings, getMacros());
{ {
std::lock_guard lock(shared->clusters_mutex); std::lock_guard lock(shared->clusters_mutex);
@ -2287,7 +2287,7 @@ std::shared_ptr<Clusters> Context::getClusters() const
if (!shared->clusters) if (!shared->clusters)
{ {
const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef(); const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
shared->clusters = std::make_shared<Clusters>(config, settings); shared->clusters = std::make_shared<Clusters>(config, settings, getMacros());
} }
return shared->clusters; return shared->clusters;
@ -2318,7 +2318,7 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters_config = config; shared->clusters_config = config;
if (!shared->clusters) if (!shared->clusters)
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, config_name); shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config); shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
} }

View File

@ -201,11 +201,10 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!cluster_name.empty()) if (!cluster_name.empty())
{ {
/// Use an existing cluster from the main config /// Use an existing cluster from the main config
String cluster_name_expanded = context->getMacros()->expand(cluster_name);
if (name != "clusterAllReplicas") if (name != "clusterAllReplicas")
cluster = context->getCluster(cluster_name_expanded); cluster = context->getCluster(cluster_name);
else else
cluster = context->getCluster(cluster_name_expanded)->getClusterWithReplicasAsShards(context->getSettingsRef()); cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
} }
else else
{ {

View File

@ -21,4 +21,7 @@
</cluster_simple> </cluster_simple>
</remote_servers> </remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>
</macros>
</clickhouse> </clickhouse>

View File

@ -95,6 +95,29 @@ def test_count(started_cluster):
assert TSV(pure_s3) == TSV(s3_distibuted) assert TSV(pure_s3) == TSV(s3_distibuted)
def test_count_macro(started_cluster):
node = started_cluster.instances["s0_0_0"]
s3_macro = node.query(
"""
SELECT count(*) from s3Cluster(
'{default_cluster_macro}', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distibuted)
s3_distibuted = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distibuted)
assert TSV(s3_macro) == TSV(s3_distibuted)
def test_union_all(started_cluster): def test_union_all(started_cluster):
node = started_cluster.instances["s0_0_0"] node = started_cluster.instances["s0_0_0"]
pure_s3 = node.query( pure_s3 = node.query(

View File

@ -0,0 +1,5 @@
<clickhouse>
<macros>
<default_cluster_macro>test_cluster_two_shards</default_cluster_macro>
</macros>
</clickhouse>

View File

@ -2,10 +2,13 @@ import os
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_hdfs=True) node1 = cluster.add_instance(
"node1", main_configs=["configs/macro.xml"], with_hdfs=True
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -565,6 +568,22 @@ def test_cluster_join(started_cluster):
assert "AMBIGUOUS_COLUMN_NAME" not in result assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_cluster_macro(started_cluster):
with_macro = node1.query(
"""
SELECT id FROM hdfsCluster('{default_cluster_macro}', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32')
"""
)
no_macro = node1.query(
"""
SELECT id FROM hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32')
"""
)
assert TSV(with_macro) == TSV(no_macro)
def test_virtual_columns_2(started_cluster): def test_virtual_columns_2(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api