Consistent replica id assignment

This commit is contained in:
Igor Nikonov 2024-06-14 12:02:47 +00:00
parent 048cbb17a6
commit 245476b34b
2 changed files with 30 additions and 7 deletions

View File

@ -280,8 +280,7 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1;
const auto number_of_local_replica = 0;
auto read_from_merge_tree_parallel_replicas
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica);
node->step = std::move(read_from_merge_tree_parallel_replicas);

View File

@ -434,14 +434,11 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shard.getAllNodeCount());
all_replicas_count = shard.getAllNodeCount();
}
if (exclude_local_replica)
--all_replicas_count;
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(all_replicas_count);
}
else
{
@ -451,10 +448,37 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
for (size_t i=0; i < all_replicas_count; ++i)
std::vector<ConnectionPoolPtr> pools_to_use;
if (exclude_local_replica)
{
IConnections::ReplicaInfo replica_info
std::vector<size_t> local_addr_possitions;
for (auto & pool : shuffled_pool)
{
const auto & hostname = pool.pool->getHost();
auto it = std::find_if(
begin(shard.local_addresses),
end(shard.local_addresses),
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (it != shard.local_addresses.end())
pool.pool.reset();
}
}
for (const auto & pool : shuffled_pool)
{
if (pool.pool)
pools_to_use.push_back(pool.pool);
}
if (pools_to_use.size() > all_replicas_count)
pools_to_use.resize(all_replicas_count);
else
all_replicas_count = pools_to_use.size();
/// local replicas has number 0
size_t offset = (exclude_local_replica ? 1 : 0);
for (size_t i = 0 + offset; i < all_replicas_count + offset; ++i)
{
IConnections::ReplicaInfo replica_info{
.all_replicas_count = all_replicas_count,
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = i,