diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 2c75bd821fe..e645a8553d4 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB { @@ -449,6 +450,10 @@ void Cluster::initMisc() } } +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings) const +{ + return std::unique_ptr{ new Cluster(settings, *this)}; +} std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const { @@ -460,6 +465,47 @@ std::unique_ptr Cluster::getClusterWithMultipleShards(const std::vector return std::unique_ptr{ new Cluster(*this, indices) }; } +Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{} +{ + if (!from.addresses_with_failover.empty()) + { + for (size_t shard_index : ext::range(0, from.shards_info.size())) + { + const auto & replicas = from.addresses_with_failover[shard_index]; + for (size_t replica_index : ext::range(0, replicas.size())) + { + ShardInfo info; + Address address; + address = replicas[replica_index]; + if (address.is_local) + info.local_addresses.push_back(replicas[replica_index]); + + ConnectionPoolPtr pool = std::make_shared( + settings.distributed_connections_pool_size, + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + "server", + address.compression, + address.secure); + + info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings.load_balancing); + info.per_replica_pools = {std::move(pool)}; + std ::vector newAddress = {address}; + addresses_with_failover.emplace_back(newAddress); + shards_info.emplace_back(std::move(info)); + } + } + } + else + { + throw Exception("There must be either 'node' or 'shard' elements in the cluster", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + } + initMisc(); +} + Cluster::Cluster(const Cluster & from, const std::vector & indices) : shards_info{} { diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index e778c9bcf6f..335f3475580 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -26,9 +26,12 @@ public: const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false); - Cluster(const Cluster &) = delete; + Cluster(const Settings & settings, const Cluster &); + + Cluster(const Cluster &)= delete; Cluster & operator=(const Cluster &) = delete; + /// is used to set a limit on the size of the timeout static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); @@ -148,6 +151,9 @@ public: /// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; + /// Get a new Cluster From the existing cluster + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings) const; + private: using SlotToShard = std::vector; SlotToShard slot_to_shard;