Merge pull request #66880 from zoomxi/unavaliable_replicas

Fix: throw if can't connect to any participating replicas for parallel replicas query execution
This commit is contained in:
Igor Nikonov 2024-07-25 19:56:06 +00:00 committed by GitHub
commit e32109174e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 9 additions and 10 deletions

View File

@ -24,7 +24,6 @@
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <Storages/Distributed/DistributedSettings.h> #include <Storages/Distributed/DistributedSettings.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageSnapshot.h> #include <Storages/StorageSnapshot.h>
@ -517,14 +516,11 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
} }
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables(); auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>( auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast, query_ast,
new_cluster, new_cluster,
storage_id, storage_id,
std::move(coordinator),
header, header,
processed_stage, processed_stage,
new_context, new_context,

View File

@ -21,7 +21,7 @@
#include <Client/ConnectionPoolWithFailover.h> #include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
namespace DB namespace DB
@ -362,7 +362,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_, ASTPtr query_ast_,
ClusterPtr cluster_, ClusterPtr cluster_,
const StorageID & storage_id_, const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_, Block header_,
QueryProcessingStage::Enum stage_, QueryProcessingStage::Enum stage_,
ContextMutablePtr context_, ContextMutablePtr context_,
@ -375,7 +374,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, cluster(cluster_) , cluster(cluster_)
, query_ast(query_ast_) , query_ast(query_ast_)
, storage_id(storage_id_) , storage_id(storage_id_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_)) , stage(std::move(stage_))
, context(context_) , context(context_)
, throttler(throttler_) , throttler(throttler_)
@ -438,6 +436,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
} }
coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size);
for (size_t i=0; i < max_replicas_to_use; ++i) for (size_t i=0; i < max_replicas_to_use; ++i)
{ {
IConnections::ReplicaInfo replica_info IConnections::ReplicaInfo replica_info

View File

@ -70,7 +70,6 @@ public:
ASTPtr query_ast_, ASTPtr query_ast_,
ClusterPtr cluster_, ClusterPtr cluster_,
const StorageID & storage_id_, const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_, Block header_,
QueryProcessingStage::Enum stage_, QueryProcessingStage::Enum stage_,
ContextMutablePtr context_, ContextMutablePtr context_,

View File

@ -33,7 +33,10 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) @pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards): @pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100])
def test_skip_all_replicas(
start_cluster, skip_unavailable_shards, max_parallel_replicas
):
cluster_name = "test_1_shard_3_unavaliable_replicas" cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt" table_name = "tt"
create_tables(cluster_name, table_name) create_tables(cluster_name, table_name)
@ -43,7 +46,7 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={ settings={
"allow_experimental_parallel_reading_from_replicas": 2, "allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3, "max_parallel_replicas": max_parallel_replicas,
"cluster_for_parallel_replicas": cluster_name, "cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards, "skip_unavailable_shards": skip_unavailable_shards,
}, },