From f51412a2a2fcef4cc479ca827304e38ffb33fb01 Mon Sep 17 00:00:00 2001 From: Kiran Date: Wed, 1 Jan 2020 13:09:10 +0530 Subject: [PATCH 01/12] added table function for clusterAll replicas --- .../TableFunctions/TableFunctionRemote.cpp | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/dbms/src/TableFunctions/TableFunctionRemote.cpp b/dbms/src/TableFunctions/TableFunctionRemote.cpp index 87c8989cbe2..0242942e618 100644 --- a/dbms/src/TableFunctions/TableFunctionRemote.cpp +++ b/dbms/src/TableFunctions/TableFunctionRemote.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "registerTableFunctions.h" @@ -137,11 +138,30 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C setIdentifierSpecial(ast); ClusterPtr cluster; - if (!cluster_name.empty()) + if (!cluster_name.empty() && name == "cluster") { /// Use an existing cluster from the main config cluster = context.getCluster(cluster_name); } + else if(!cluster_name.empty() && name == "clusterAll") { + std::vector> clusterNodes; + cluster = context.getCluster(cluster_name); + // creating a new topology for clusterAll + const auto & addresses_with_failovers = cluster->getShardsAddresses(); + const auto & shards_info = cluster->getShardsInfo(); + auto maybe_secure_port = context.getTCPPortSecure(); + + for (size_t shard_index : ext::range(0, shards_info.size())) + { + const auto & replicas = addresses_with_failovers[shard_index]; + for (size_t replica_index : ext::range(0, replicas.size())) + { + std::vector newNode={replicas[replica_index].host_name}; + clusterNodes.push_back(newNode); + } + } + cluster = std::make_shared(context.getSettings(), clusterNodes, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure); + } else { /// Create new cluster from the scratch @@ -198,7 +218,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_) : name{name_}, secure{secure_} { - is_cluster_function = name == "cluster"; + is_cluster_function = (name == "cluster" || name == "clusterAll"); std::stringstream ss; ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters" @@ -213,6 +233,7 @@ void registerTableFunctionRemote(TableFunctionFactory & factory) factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared("remote"); }); factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared("remote", /* secure = */ true); }); factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared("cluster"); }); + factory.registerFunction("clusterAll", [] () -> TableFunctionPtr { return std::make_shared("clusterAll"); }); } } From 8274964b7d5f4735b0c859d00a81bbf87ac3ab22 Mon Sep 17 00:00:00 2001 From: Kiran Date: Wed, 1 Jan 2020 13:48:30 +0530 Subject: [PATCH 02/12] formatted code with clang and updated tableFunction name --- .../TableFunctions/TableFunctionRemote.cpp | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dbms/src/TableFunctions/TableFunctionRemote.cpp b/dbms/src/TableFunctions/TableFunctionRemote.cpp index 0242942e618..e78fcf44880 100644 --- a/dbms/src/TableFunctions/TableFunctionRemote.cpp +++ b/dbms/src/TableFunctions/TableFunctionRemote.cpp @@ -143,10 +143,11 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C /// Use an existing cluster from the main config cluster = context.getCluster(cluster_name); } - else if(!cluster_name.empty() && name == "clusterAll") { + else if (!cluster_name.empty() && name == "clusterAllReplicas") + { std::vector> clusterNodes; cluster = context.getCluster(cluster_name); - // creating a new topology for clusterAll + // creating a new topology for clusterAllReplicas const auto & addresses_with_failovers = cluster->getShardsAddresses(); const auto & shards_info = cluster->getShardsInfo(); auto maybe_secure_port = context.getTCPPortSecure(); @@ -156,11 +157,18 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C const auto & replicas = addresses_with_failovers[shard_index]; for (size_t replica_index : ext::range(0, replicas.size())) { - std::vector newNode={replicas[replica_index].host_name}; + std::vector newNode = {replicas[replica_index].host_name}; clusterNodes.push_back(newNode); } } - cluster = std::make_shared(context.getSettings(), clusterNodes, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure); + cluster = std::make_shared( + context.getSettings(), + clusterNodes, + username, + password, + (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), + false, + secure); } else { @@ -218,7 +226,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_) : name{name_}, secure{secure_} { - is_cluster_function = (name == "cluster" || name == "clusterAll"); + is_cluster_function = (name == "cluster" || name == "clusterAllReplicas"); std::stringstream ss; ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters" @@ -233,7 +241,7 @@ void registerTableFunctionRemote(TableFunctionFactory & factory) factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared("remote"); }); factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared("remote", /* secure = */ true); }); factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared("cluster"); }); - factory.registerFunction("clusterAll", [] () -> TableFunctionPtr { return std::make_shared("clusterAll"); }); + factory.registerFunction("clusterAllReplicas", [] () -> TableFunctionPtr { return std::make_shared("clusterAllReplicas"); }); } } From ebb1864522bf39d9455485be9242f1abfd2a6d83 Mon Sep 17 00:00:00 2001 From: Kiran Date: Thu, 2 Jan 2020 19:38:54 +0530 Subject: [PATCH 03/12] corrected control flow for clusterAllReplicas-TableFunction --- dbms/src/TableFunctions/TableFunctionRemote.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/TableFunctions/TableFunctionRemote.cpp b/dbms/src/TableFunctions/TableFunctionRemote.cpp index e78fcf44880..d98da781f0e 100644 --- a/dbms/src/TableFunctions/TableFunctionRemote.cpp +++ b/dbms/src/TableFunctions/TableFunctionRemote.cpp @@ -138,7 +138,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C setIdentifierSpecial(ast); ClusterPtr cluster; - if (!cluster_name.empty() && name == "cluster") + if (!cluster_name.empty() && name != "clusterAllReplicas") { /// Use an existing cluster from the main config cluster = context.getCluster(cluster_name); From ce55b0ffb5055f5381d28efab4d9f3391f8564f3 Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 15:56:16 +0530 Subject: [PATCH 04/12] addded new constructor in cluster class for creating a new cluster from replicas --- dbms/src/Interpreters/Cluster.cpp | 46 +++++++++++++++++++++++++++++++ dbms/src/Interpreters/Cluster.h | 8 +++++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 2c75bd821fe..e645a8553d4 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB { @@ -449,6 +450,10 @@ void Cluster::initMisc() } } +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings) const +{ + return std::unique_ptr{ new Cluster(settings, *this)}; +} std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const { @@ -460,6 +465,47 @@ std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector return std::unique_ptr{ new Cluster(*this, indices) }; } +Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{} +{ + if (!from.addresses_with_failover.empty()) + { + for (size_t shard_index : ext::range(0, from.shards_info.size())) + { + const auto & replicas = from.addresses_with_failover[shard_index]; + for (size_t replica_index : ext::range(0, replicas.size())) + { + ShardInfo info; + Address address; + address = replicas[replica_index]; + if (address.is_local) + info.local_addresses.push_back(replicas[replica_index]); + + ConnectionPoolPtr pool = std::make_shared( + settings.distributed_connections_pool_size, + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + "server", + address.compression, + address.secure); + + info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); + info.per_replica_pools = {std::move(pool)}; + std ::vector newAddress = {address}; + addresses_with_failover.emplace_back(newAddress); + shards_info.emplace_back(std::move(info)); + } + } + } + else + { + throw Exception("There must be either 'node' or 'shard' elements in the cluster", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + } + initMisc(); +} + Cluster::Cluster(const Cluster & from, const std::vector & indices) : shards_info{} { diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index e778c9bcf6f..335f3475580 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -26,9 +26,12 @@ public: const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false); - Cluster(const Cluster &) = delete; + Cluster(const Settings & settings, const Cluster &); + + Cluster(const Cluster &)= delete; Cluster & operator=(const Cluster &) = delete; + /// is used to set a limit on the size of the timeout static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); @@ -148,6 +151,9 @@ public: /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; + /// Get a new Cluster From the existing cluster + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings) const; + private: using SlotToShard = std::vector; SlotToShard slot_to_shard; From 1ac7ed5abc42c2779a2b1a5b70d7c09356e92aab Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 15:56:53 +0530 Subject: [PATCH 05/12] altered flow for clusterAllreplicas table function --- .../TableFunctions/TableFunctionRemote.cpp | 47 +++++++------------ 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/dbms/src/TableFunctions/TableFunctionRemote.cpp b/dbms/src/TableFunctions/TableFunctionRemote.cpp index d98da781f0e..033839009bb 100644 --- a/dbms/src/TableFunctions/TableFunctionRemote.cpp +++ b/dbms/src/TableFunctions/TableFunctionRemote.cpp @@ -138,37 +138,13 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C setIdentifierSpecial(ast); ClusterPtr cluster; - if (!cluster_name.empty() && name != "clusterAllReplicas") + if (!cluster_name.empty()) { /// Use an existing cluster from the main config - cluster = context.getCluster(cluster_name); - } - else if (!cluster_name.empty() && name == "clusterAllReplicas") - { - std::vector> clusterNodes; - cluster = context.getCluster(cluster_name); - // creating a new topology for clusterAllReplicas - const auto & addresses_with_failovers = cluster->getShardsAddresses(); - const auto & shards_info = cluster->getShardsInfo(); - auto maybe_secure_port = context.getTCPPortSecure(); - - for (size_t shard_index : ext::range(0, shards_info.size())) - { - const auto & replicas = addresses_with_failovers[shard_index]; - for (size_t replica_index : ext::range(0, replicas.size())) - { - std::vector newNode = {replicas[replica_index].host_name}; - clusterNodes.push_back(newNode); - } - } - cluster = std::make_shared( - context.getSettings(), - clusterNodes, - username, - password, - (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), - false, - secure); + if (name != "clusterAllReplicas") + cluster = context.getCluster(cluster_name); + else + cluster = context.getCluster(cluster_name)->getClusterWithReplicasAsShards(context.getSettings()); } else { @@ -192,13 +168,22 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C { size_t colon = host.find(':'); if (colon == String::npos) - context.getRemoteHostFilter().checkHostAndPort(host, toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()))); + context.getRemoteHostFilter().checkHostAndPort( + host, + toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()))); else context.getRemoteHostFilter().checkHostAndPort(host.substr(0, colon), host.substr(colon + 1)); } } - cluster = std::make_shared(context.getSettings(), names, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure); + cluster = std::make_shared( + context.getSettings(), + names, + username, + password, + (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), + false, + secure); } auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr); From c87f74c6374ee40df695ed4fa3c035afe6809ee8 Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 15:57:24 +0530 Subject: [PATCH 06/12] added new integration test for clusterAllreplicas tabe function --- .../test_cluster_all_replicas/__init__.py | 0 .../configs/remote_servers.xml | 16 ++++++++++++++ .../test_cluster_all_replicas/test.py | 21 +++++++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 dbms/tests/integration/test_cluster_all_replicas/__init__.py create mode 100644 dbms/tests/integration/test_cluster_all_replicas/configs/remote_servers.xml create mode 100644 dbms/tests/integration/test_cluster_all_replicas/test.py diff --git a/dbms/tests/integration/test_cluster_all_replicas/__init__.py b/dbms/tests/integration/test_cluster_all_replicas/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_cluster_all_replicas/configs/remote_servers.xml b/dbms/tests/integration/test_cluster_all_replicas/configs/remote_servers.xml new file mode 100644 index 00000000000..68dcfcc1460 --- /dev/null +++ b/dbms/tests/integration/test_cluster_all_replicas/configs/remote_servers.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_cluster_all_replicas/test.py b/dbms/tests/integration/test_cluster_all_replicas/test.py new file mode 100644 index 00000000000..0af5693fc75 --- /dev/null +++ b/dbms/tests/integration/test_cluster_all_replicas/test.py @@ -0,0 +1,21 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) +node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_remote(start_cluster): + assert node1.query('''SELECT hostName() FROM clusterAllReplicas("two_shards", system.one)''') == 'node1\nnode2\n' + assert node1.query('''SELECT hostName() FROM cluster("two_shards", system.one)''') == 'node1\n' From 9c3e80a77481bccc6dd270ddc53a1490f1a9157c Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 23:49:39 +0530 Subject: [PATCH 07/12] removed copy constructor and moved to private methid --- dbms/src/Interpreters/Cluster.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index 335f3475580..8b95b1ce986 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -26,12 +26,9 @@ public: const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false); - Cluster(const Settings & settings, const Cluster &); - Cluster(const Cluster &)= delete; Cluster & operator=(const Cluster &) = delete; - /// is used to set a limit on the size of the timeout static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); @@ -167,6 +164,9 @@ private: /// For getClusterWithMultipleShards implementation. Cluster(const Cluster & from, const std::vector & indices); + /// For getClusterWithReplicasAsShards implementation + Cluster(const Settings & settings, const Cluster &); + String hash_of_addresses; /// Description of the cluster shards. ShardsInfo shards_info; From 42cb2ed81b3a807db5ec9b34dc8901cf8123308c Mon Sep 17 00:00:00 2001 From: Kiran Date: Tue, 7 Jan 2020 23:50:12 +0530 Subject: [PATCH 08/12] handled flattening nodes in circular topology --- dbms/src/Interpreters/Cluster.cpp | 50 ++++++++++++++++++------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index e645a8553d4..bc5f453e02b 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -467,6 +467,7 @@ std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{} { + std::set> hosts; if (!from.addresses_with_failover.empty()) { for (size_t shard_index : ext::range(0, from.shards_info.size())) @@ -475,27 +476,36 @@ Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{ for (size_t replica_index : ext::range(0, replicas.size())) { ShardInfo info; - Address address; - address = replicas[replica_index]; - if (address.is_local) - info.local_addresses.push_back(replicas[replica_index]); + Address address = replicas[replica_index]; + auto position = find_if(hosts.begin(), hosts.end(), [=](auto item) { + return std::get<0>(item) == address.host_name && std::get<1>(item) == address.port; + }); + if (position == hosts.end()) + { + if (address.is_local) + info.local_addresses.push_back(replicas[replica_index]); + hosts.insert(std::tuple (address.host_name, address.port)); + ConnectionPoolPtr pool = std::make_shared( + settings.distributed_connections_pool_size, + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + "server", + address.compression, + address.secure); - ConnectionPoolPtr pool = std::make_shared( - settings.distributed_connections_pool_size, - address.host_name, - address.port, - address.default_database, - address.user, - address.password, - "server", - address.compression, - address.secure); - - info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); - info.per_replica_pools = {std::move(pool)}; - std ::vector newAddress = {address}; - addresses_with_failover.emplace_back(newAddress); - shards_info.emplace_back(std::move(info)); + info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); + info.per_replica_pools = {std::move(pool)}; + std ::vector newAddress = {address}; + addresses_with_failover.emplace_back(newAddress); + shards_info.emplace_back(std::move(info)); + } + else + { + continue; + } } } } From 4945d9f7974c808bbd53b7144ee941c4f7096011 Mon Sep 17 00:00:00 2001 From: Kiran Date: Fri, 10 Jan 2020 08:11:58 +0530 Subject: [PATCH 09/12] Formatted code with clang --- dbms/src/Interpreters/Cluster.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index bc5f453e02b..5089da23217 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -484,7 +484,7 @@ Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{ { if (address.is_local) info.local_addresses.push_back(replicas[replica_index]); - hosts.insert(std::tuple (address.host_name, address.port)); + hosts.insert(std::tuple(address.host_name, address.port)); ConnectionPoolPtr pool = std::make_shared( settings.distributed_connections_pool_size, address.host_name, From f0f870b27fe43a9a987e4c32e563f4947a02bed4 Mon Sep 17 00:00:00 2001 From: Kiran Date: Fri, 10 Jan 2020 10:38:54 +0530 Subject: [PATCH 10/12] proper style format added in cluster.cpp --- dbms/src/Interpreters/Cluster.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 5089da23217..f0a025dcb4c 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -477,9 +477,11 @@ Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{ { ShardInfo info; Address address = replicas[replica_index]; - auto position = find_if(hosts.begin(), hosts.end(), [=](auto item) { - return std::get<0>(item) == address.host_name && std::get<1>(item) == address.port; - }); + auto position = find_if(hosts.begin(), hosts.end(), + [=](auto item) { + return std::get<0>(item) == address.host_name && + std::get<1>(item) == address.port; + }); if (position == hosts.end()) { if (address.is_local) From 2e433b6459bd31853523515e77d1a366aa585fb7 Mon Sep 17 00:00:00 2001 From: Kiran Date: Fri, 10 Jan 2020 12:58:16 +0530 Subject: [PATCH 11/12] cluster.cpp file formatted --- dbms/src/Interpreters/Cluster.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index f0a025dcb4c..aaa6b31ab36 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -477,11 +477,7 @@ Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{ { ShardInfo info; Address address = replicas[replica_index]; - auto position = find_if(hosts.begin(), hosts.end(), - [=](auto item) { - return std::get<0>(item) == address.host_name && - std::get<1>(item) == address.port; - }); + auto position = find_if(hosts.begin(), hosts.end(), [=](auto item) {return std::get<0>(item) == address.host_name && std::get<1>(item) == address.port;}); if (position == hosts.end()) { if (address.is_local) From b4ccddbb9669f124d48075a653f6cfcc7c22a798 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 10 Jan 2020 20:44:34 +0300 Subject: [PATCH 12/12] Make the code less wrong #8493 --- dbms/src/Interpreters/Cluster.cpp | 85 ++++++++++++++----------------- dbms/src/Interpreters/Cluster.h | 8 +-- 2 files changed, 44 insertions(+), 49 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index aaa6b31ab36..71bd89b2b6f 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -452,69 +452,62 @@ void Cluster::initMisc() std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings) const { - return std::unique_ptr{ new Cluster(settings, *this)}; + return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings)}; } std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const { - return std::unique_ptr{ new Cluster(*this, {index}) }; + return std::unique_ptr{ new Cluster(SubclusterTag{}, *this, {index}) }; } std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector & indices) const { - return std::unique_ptr{ new Cluster(*this, indices) }; + return std::unique_ptr{ new Cluster(SubclusterTag{}, *this, indices) }; } -Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{} +Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings) + : shards_info{}, addresses_with_failover{} { - std::set> hosts; - if (!from.addresses_with_failover.empty()) - { - for (size_t shard_index : ext::range(0, from.shards_info.size())) - { - const auto & replicas = from.addresses_with_failover[shard_index]; - for (size_t replica_index : ext::range(0, replicas.size())) - { - ShardInfo info; - Address address = replicas[replica_index]; - auto position = find_if(hosts.begin(), hosts.end(), [=](auto item) {return std::get<0>(item) == address.host_name && std::get<1>(item) == address.port;}); - if (position == hosts.end()) - { - if (address.is_local) - info.local_addresses.push_back(replicas[replica_index]); - hosts.insert(std::tuple(address.host_name, address.port)); - ConnectionPoolPtr pool = std::make_shared( - settings.distributed_connections_pool_size, - address.host_name, - address.port, - address.default_database, - address.user, - address.password, - "server", - address.compression, - address.secure); + if (from.addresses_with_failover.empty()) + throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR); - info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); - info.per_replica_pools = {std::move(pool)}; - std ::vector newAddress = {address}; - addresses_with_failover.emplace_back(newAddress); - shards_info.emplace_back(std::move(info)); - } - else - { - continue; - } - } + std::set> unique_hosts; + for (size_t shard_index : ext::range(0, from.shards_info.size())) + { + const auto & replicas = from.addresses_with_failover[shard_index]; + for (const auto & address : replicas) + { + if (!unique_hosts.emplace(address.host_name, address.port).second) + continue; /// Duplicate host, skip. + + ShardInfo info; + if (address.is_local) + info.local_addresses.push_back(address); + + ConnectionPoolPtr pool = std::make_shared( + settings.distributed_connections_pool_size, + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + "server", + address.compression, + address.secure); + + info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); + info.per_replica_pools = {std::move(pool)}; + + addresses_with_failover.emplace_back(Addresses{address}); + shards_info.emplace_back(std::move(info)); } } - else - { - throw Exception("There must be either 'node' or 'shard' elements in the cluster", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - } + initMisc(); } -Cluster::Cluster(const Cluster & from, const std::vector & indices) + +Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector & indices) : shards_info{} { for (size_t index : indices) diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index 8b95b1ce986..ef12f9fe78f 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -148,7 +148,7 @@ public: /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; - /// Get a new Cluster From the existing cluster + /// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards. std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings) const; private: @@ -162,10 +162,12 @@ private: void initMisc(); /// For getClusterWithMultipleShards implementation. - Cluster(const Cluster & from, const std::vector & indices); + struct SubclusterTag {}; + Cluster(SubclusterTag, const Cluster & from, const std::vector & indices); /// For getClusterWithReplicasAsShards implementation - Cluster(const Settings & settings, const Cluster &); + struct ReplicasAsShardsTag {}; + Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings); String hash_of_addresses; /// Description of the cluster shards.