From 8246614f5e5cdec99941f048f51cb28b9eee03a5 Mon Sep 17 00:00:00 2001 From: zoomxi <419486879@qq.com> Date: Mon, 22 Jul 2024 17:57:30 +0800 Subject: [PATCH 1/5] throw if can't connect to any participating replicas --- src/Processors/QueryPlan/ReadFromRemote.cpp | 3 +- .../ParallelReplicasReadingCoordinator.cpp | 4 +-- .../ParallelReplicasReadingCoordinator.h | 4 +++ .../test.py | 35 +++++++++++++++++++ 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index e27515a62a4..29e12c1e613 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -21,7 +21,7 @@ #include #include #include - +#include #include namespace DB @@ -429,6 +429,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder { shuffled_pool = shard.pool->getShuffledPools(current_settings); shuffled_pool.resize(max_replicas_to_use); + coordinator->adjustParticipatingReplicasCount(max_replicas_to_use); } else { diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index f46b4de10b7..2ba66256116 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -1031,7 +1031,7 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica if (!pimpl) { unavailable_nodes_registered_before_initialization.push_back(replica_number); - if (unavailable_nodes_registered_before_initialization.size() == replicas_count) + if (unavailable_nodes_registered_before_initialization.size() == participating_replicas_count) throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution"); } else @@ -1061,7 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) } ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) - : replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) + : replicas_count(replicas_count_), participating_replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) { } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index 8b463fda395..c06ef6ef01a 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -30,11 +30,15 @@ public: /// needed to report total rows to read void setProgressCallback(ProgressCallback callback); + /// Participating replicas count may be less than replicas count in a shard. + void adjustParticipatingReplicasCount(size_t count) { participating_replicas_count = count; } + private: void initialize(CoordinationMode mode); std::mutex mutex; const size_t replicas_count{0}; + size_t participating_replicas_count{0}; size_t mark_segment_size{0}; std::unique_ptr pimpl; ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation diff --git a/tests/integration/test_parallel_replicas_no_replicas/test.py b/tests/integration/test_parallel_replicas_no_replicas/test.py index 9f716459643..b77da338554 100644 --- a/tests/integration/test_parallel_replicas_no_replicas/test.py +++ b/tests/integration/test_parallel_replicas_no_replicas/test.py @@ -48,3 +48,38 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards): "skip_unavailable_shards": skip_unavailable_shards, }, ) + +@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) +def test_skip_all_participating_replicas1(start_cluster, skip_unavailable_shards): + cluster_name = "test_1_shard_3_unavaliable_replicas" + table_name = "tt1" + create_tables(cluster_name, table_name) + + with pytest.raises(QueryRuntimeException): + initiator.query( + f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 3, + "cluster_for_parallel_replicas": cluster_name, + "skip_unavailable_shards": skip_unavailable_shards, + "parallel_replicas_min_number_of_rows_per_replica": 500, + }, + ) + +@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) +def test_skip_all_participating_replicas2(start_cluster, skip_unavailable_shards): + cluster_name = "test_1_shard_3_unavaliable_replicas" + table_name = "tt2" + create_tables(cluster_name, table_name) + + with pytest.raises(QueryRuntimeException): + initiator.query( + f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 2, + "cluster_for_parallel_replicas": cluster_name, + "skip_unavailable_shards": skip_unavailable_shards, + }, + ) From dee5790b22d06e3916f4030937eb9384247a6baa Mon Sep 17 00:00:00 2001 From: zoomxi <419486879@qq.com> Date: Tue, 23 Jul 2024 10:49:37 +0800 Subject: [PATCH 2/5] =?UTF-8?q?According=20to=20the=20suggestions=20from?= =?UTF-8?q?=20cr=EF=BC=8C=20modify=20test=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test.py | 26 +++---------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/tests/integration/test_parallel_replicas_no_replicas/test.py b/tests/integration/test_parallel_replicas_no_replicas/test.py index b77da338554..e05f72316d0 100644 --- a/tests/integration/test_parallel_replicas_no_replicas/test.py +++ b/tests/integration/test_parallel_replicas_no_replicas/test.py @@ -49,8 +49,8 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards): }, ) -@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) -def test_skip_all_participating_replicas1(start_cluster, skip_unavailable_shards): +@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100]) +def test_skip_all_participating_replicas(start_cluster, max_parallel_replicas): cluster_name = "test_1_shard_3_unavaliable_replicas" table_name = "tt1" create_tables(cluster_name, table_name) @@ -60,26 +60,8 @@ def test_skip_all_participating_replicas1(start_cluster, skip_unavailable_shards f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", settings={ "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": 3, + "max_parallel_replicas": max_parallel_replicas, "cluster_for_parallel_replicas": cluster_name, - "skip_unavailable_shards": skip_unavailable_shards, - "parallel_replicas_min_number_of_rows_per_replica": 500, - }, - ) - -@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) -def test_skip_all_participating_replicas2(start_cluster, skip_unavailable_shards): - cluster_name = "test_1_shard_3_unavaliable_replicas" - table_name = "tt2" - create_tables(cluster_name, table_name) - - with pytest.raises(QueryRuntimeException): - initiator.query( - f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", - settings={ - "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": 2, - "cluster_for_parallel_replicas": cluster_name, - "skip_unavailable_shards": skip_unavailable_shards, + "skip_unavailable_shards": 1, }, ) From d460fa9f3618cdba64315bce579a93329463b7ad Mon Sep 17 00:00:00 2001 From: zoomxi <419486879@qq.com> Date: Wed, 24 Jul 2024 09:57:00 +0800 Subject: [PATCH 3/5] create coordinator after the number of replicas to use for the query is determined --- .../ClusterProxy/executeQuery.cpp | 3 --- src/Processors/QueryPlan/ReadFromRemote.cpp | 6 ++--- src/Processors/QueryPlan/ReadFromRemote.h | 1 - .../ParallelReplicasReadingCoordinator.cpp | 4 ++-- .../ParallelReplicasReadingCoordinator.h | 4 ---- .../test.py | 22 +++---------------- 6 files changed, 8 insertions(+), 32 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 5d56ef09127..59f095f7487 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -517,14 +517,11 @@ void executeQueryWithParallelReplicas( "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); } - auto coordinator = std::make_shared( - new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size); auto external_tables = new_context->getExternalTables(); auto read_from_remote = std::make_unique( query_ast, new_cluster, storage_id, - std::move(coordinator), header, processed_stage, new_context, diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 29e12c1e613..cf11052cd59 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -362,7 +362,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( ASTPtr query_ast_, ClusterPtr cluster_, const StorageID & storage_id_, - ParallelReplicasReadingCoordinatorPtr coordinator_, Block header_, QueryProcessingStage::Enum stage_, ContextMutablePtr context_, @@ -375,7 +374,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( , cluster(cluster_) , query_ast(query_ast_) , storage_id(storage_id_) - , coordinator(std::move(coordinator_)) , stage(std::move(stage_)) , context(context_) , throttler(throttler_) @@ -429,7 +427,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder { shuffled_pool = shard.pool->getShuffledPools(current_settings); shuffled_pool.resize(max_replicas_to_use); - coordinator->adjustParticipatingReplicasCount(max_replicas_to_use); } else { @@ -439,6 +436,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); } + coordinator + = std::make_shared(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size); + for (size_t i=0; i < max_replicas_to_use; ++i) { IConnections::ReplicaInfo replica_info diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index eb15269155a..1adb26b2915 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -70,7 +70,6 @@ public: ASTPtr query_ast_, ClusterPtr cluster_, const StorageID & storage_id_, - ParallelReplicasReadingCoordinatorPtr coordinator_, Block header_, QueryProcessingStage::Enum stage_, ContextMutablePtr context_, diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 2ba66256116..f46b4de10b7 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -1031,7 +1031,7 @@ void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica if (!pimpl) { unavailable_nodes_registered_before_initialization.push_back(replica_number); - if (unavailable_nodes_registered_before_initialization.size() == participating_replicas_count) + if (unavailable_nodes_registered_before_initialization.size() == replicas_count) throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Can't connect to any replica chosen for query execution"); } else @@ -1061,7 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) } ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) - : replicas_count(replicas_count_), participating_replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) + : replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) { } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index c06ef6ef01a..8b463fda395 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -30,15 +30,11 @@ public: /// needed to report total rows to read void setProgressCallback(ProgressCallback callback); - /// Participating replicas count may be less than replicas count in a shard. - void adjustParticipatingReplicasCount(size_t count) { participating_replicas_count = count; } - private: void initialize(CoordinationMode mode); std::mutex mutex; const size_t replicas_count{0}; - size_t participating_replicas_count{0}; size_t mark_segment_size{0}; std::unique_ptr pimpl; ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation diff --git a/tests/integration/test_parallel_replicas_no_replicas/test.py b/tests/integration/test_parallel_replicas_no_replicas/test.py index e05f72316d0..04e3a54e581 100644 --- a/tests/integration/test_parallel_replicas_no_replicas/test.py +++ b/tests/integration/test_parallel_replicas_no_replicas/test.py @@ -33,7 +33,8 @@ def create_tables(cluster, table_name): @pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) -def test_skip_all_replicas(start_cluster, skip_unavailable_shards): +@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100]) +def test_skip_all_replicas(start_cluster, skip_unavailable_shards, max_parallel_replicas): cluster_name = "test_1_shard_3_unavaliable_replicas" table_name = "tt" create_tables(cluster_name, table_name) @@ -43,25 +44,8 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards): f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", settings={ "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": 3, + "max_parallel_replicas": max_parallel_replicas, "cluster_for_parallel_replicas": cluster_name, "skip_unavailable_shards": skip_unavailable_shards, }, ) - -@pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100]) -def test_skip_all_participating_replicas(start_cluster, max_parallel_replicas): - cluster_name = "test_1_shard_3_unavaliable_replicas" - table_name = "tt1" - create_tables(cluster_name, table_name) - - with pytest.raises(QueryRuntimeException): - initiator.query( - f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", - settings={ - "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": max_parallel_replicas, - "cluster_for_parallel_replicas": cluster_name, - "skip_unavailable_shards": 1, - }, - ) From 62d956f8a882f74a930340cce1650babc13bf7a1 Mon Sep 17 00:00:00 2001 From: zoomxi <419486879@qq.com> Date: Wed, 24 Jul 2024 22:39:30 +0800 Subject: [PATCH 4/5] remove unused file ParallelReplicasReadingCoordinator.h --- src/Interpreters/ClusterProxy/executeQuery.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 59f095f7487..d04a73e384e 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include From 82959ce5b3e3709f12cb5cf8d50f8ca81858c7ed Mon Sep 17 00:00:00 2001 From: zoomxi <419486879@qq.com> Date: Thu, 25 Jul 2024 09:55:23 +0800 Subject: [PATCH 5/5] format test.py --- tests/integration/test_parallel_replicas_no_replicas/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_no_replicas/test.py b/tests/integration/test_parallel_replicas_no_replicas/test.py index 04e3a54e581..62d4b005d94 100644 --- a/tests/integration/test_parallel_replicas_no_replicas/test.py +++ b/tests/integration/test_parallel_replicas_no_replicas/test.py @@ -34,7 +34,9 @@ def create_tables(cluster, table_name): @pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) @pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100]) -def test_skip_all_replicas(start_cluster, skip_unavailable_shards, max_parallel_replicas): +def test_skip_all_replicas( + start_cluster, skip_unavailable_shards, max_parallel_replicas +): cluster_name = "test_1_shard_3_unavaliable_replicas" table_name = "tt" create_tables(cluster_name, table_name)