addded new constructor in cluster class for creating a new cluster from replicas

This commit is contained in:
Kiran 2020-01-07 15:56:16 +05:30
parent ebb1864522
commit ce55b0ffb5
2 changed files with 53 additions and 1 deletions

View File

@ -10,6 +10,7 @@
#include <IO/ReadHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <ext/range.h>
namespace DB
{
@ -449,6 +450,10 @@ void Cluster::initMisc()
}
}
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings) const
{
return std::unique_ptr<Cluster>{ new Cluster(settings, *this)};
}
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
{
@ -460,6 +465,47 @@ std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector
return std::unique_ptr<Cluster>{ new Cluster(*this, indices) };
}
Cluster::Cluster(const Settings & settings, const Cluster & from) : shards_info{}, addresses_with_failover{}
{
if (!from.addresses_with_failover.empty())
{
for (size_t shard_index : ext::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
for (size_t replica_index : ext::range(0, replicas.size()))
{
ShardInfo info;
Address address;
address = replicas[replica_index];
if (address.is_local)
info.local_addresses.push_back(replicas[replica_index]);
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
"server",
address.compression,
address.secure);
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
std ::vector<Cluster::Address> newAddress = {address};
addresses_with_failover.emplace_back(newAddress);
shards_info.emplace_back(std::move(info));
}
}
}
else
{
throw Exception("There must be either 'node' or 'shard' elements in the cluster", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
initMisc();
}
Cluster::Cluster(const Cluster & from, const std::vector<size_t> & indices)
: shards_info{}
{

View File

@ -26,9 +26,12 @@ public:
const String & username, const String & password,
UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false);
Cluster(const Cluster &) = delete;
Cluster(const Settings & settings, const Cluster &);
Cluster(const Cluster &)= delete;
Cluster & operator=(const Cluster &) = delete;
/// is used to set a limit on the size of the timeout
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
@ -148,6 +151,9 @@ public:
/// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster From the existing cluster
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
private:
using SlotToShard = std::vector<UInt64>;
SlotToShard slot_to_shard;