Add replcia shuffling

This commit is contained in:
Antonio Andelic 2023-01-19 12:04:07 +00:00
parent ddfb913f99
commit 53b53a1ec9

View File

@ -15,6 +15,7 @@
#include <base/sort.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <span>
namespace DB
{
@ -677,8 +678,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
std::set<std::pair<String, int>> unique_hosts;
for (size_t shard_index : collections::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
size_t replicas_used = 0;
auto create_shards_from_replicas = [&](std::span<const Address> replicas)
{
for (const auto & address : replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
@ -686,7 +687,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
ShardInfo info;
info.shard_num = ++shard_num;
++replicas_used;
if (address.is_local)
info.local_addresses.push_back(address);
@ -713,9 +713,39 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
}
};
if (max_replicas_from_shard && replicas_used == max_replicas_from_shard)
break;
const auto & replicas = from.addresses_with_failover[shard_index];
if (!max_replicas_from_shard || replicas.size() <= max_replicas_from_shard)
{
create_shards_from_replicas(replicas);
}
else
{
std::random_device rd;
std::mt19937 gen{rd()};
auto shuffled_replicas = replicas;
if (settings.prefer_localhost_replica)
{
auto local_replica = std::find_if(shuffled_replicas.begin(), shuffled_replicas.end(), [](const auto & replica) { return replica.is_local; });
if (local_replica != shuffled_replicas.end())
{
std::swap(*shuffled_replicas.begin(), *local_replica);
std::shuffle(shuffled_replicas.begin() + 1, shuffled_replicas.end(), gen);
}
else
{
std::shuffle(shuffled_replicas.begin(), shuffled_replicas.end(), gen);
}
}
else
{
std::shuffle(shuffled_replicas.begin(), shuffled_replicas.end(), gen);
}
create_shards_from_replicas(std::span{shuffled_replicas.begin(), shuffled_replicas.begin() + max_replicas_from_shard});
}
}