From 860b18c251a3492682d410fbcc54576e87f4ca25 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Mon, 15 Jul 2024 20:40:55 +0000 Subject: [PATCH] Update test for in-(reverse)-order coordinator --- .../ParallelReplicasReadingCoordinator.cpp | 2 +- src/Storages/MergeTree/RequestResponse.cpp | 4 +--- ...02950_parallel_replicas_used_count.reference | 10 ++++++++-- .../02950_parallel_replicas_used_count.sql | 17 ++++++++++++----- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index ff309f17893..e1c0d87837a 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -918,7 +918,7 @@ ParallelReadResponse InOrderCoordinator::handleRequest(ParallelReadRequest "Replica {} decided to read in {} mode, not in {}. This is a bug", request.replica_num, magic_enum::enum_name(request.mode), magic_enum::enum_name(mode)); - LOG_TRACE(log, "Got request from replica {}, data {}", request.replica_num, request.describe()); + LOG_TRACE(log, "Got read request: {}", request.describe()); ParallelReadResponse response; response.description = request.description; diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 2ce0e20dcd2..33cd935c88b 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -44,9 +44,7 @@ void ParallelReadRequest::serialize(WriteBuffer & out) const String ParallelReadRequest::describe() const { - String result; - result += fmt::format("replica_num: {} \n", replica_num); - result += fmt::format("min_num_of_marks: {} \n", min_number_of_marks); + String result = fmt::format("replica_num {}, min_num_of_marks {}, ", replica_num, min_number_of_marks); result += description.describe(); return result; } diff --git a/tests/queries/0_stateless/02950_parallel_replicas_used_count.reference b/tests/queries/0_stateless/02950_parallel_replicas_used_count.reference index 21b7b527b7a..cc8284bfeac 100644 --- a/tests/queries/0_stateless/02950_parallel_replicas_used_count.reference +++ b/tests/queries/0_stateless/02950_parallel_replicas_used_count.reference @@ -1,8 +1,14 @@ -100 4950 +10000 49995000 1 89 90 91 92 93 -1 +3 +93 +92 +91 +90 +89 +3 diff --git a/tests/queries/0_stateless/02950_parallel_replicas_used_count.sql b/tests/queries/0_stateless/02950_parallel_replicas_used_count.sql index 22f55acd365..69f1dc47ded 100644 --- a/tests/queries/0_stateless/02950_parallel_replicas_used_count.sql +++ b/tests/queries/0_stateless/02950_parallel_replicas_used_count.sql @@ -2,11 +2,12 @@ DROP TABLE IF EXISTS test; CREATE TABLE test (k UInt64, v String) ENGINE = MergeTree -ORDER BY k; +ORDER BY k +SETTINGS index_granularity=1; -INSERT INTO test SELECT number, toString(number) FROM numbers(100); +INSERT INTO test SELECT number, toString(number) FROM numbers(10_000); -SET allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost'; +SET allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost'; -- default coordinator SELECT count(), sum(k) @@ -14,12 +15,18 @@ FROM test SETTINGS log_comment = '02950_parallel_replicas_used_replicas_count'; SYSTEM FLUSH LOGS; -SELECT ProfileEvents['ParallelReplicasUsedCount'] FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0; +SELECT ProfileEvents['ParallelReplicasUsedCount'] > 0 FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0; -- In order coordinator -SELECT k FROM test order by k limit 5 offset 89 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_2'; +SELECT k FROM test order by k limit 5 offset 89 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_2', merge_tree_min_rows_for_concurrent_read=1, max_threads=1; SYSTEM FLUSH LOGS; SELECT ProfileEvents['ParallelReplicasUsedCount'] FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_2' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0; +-- In reverse order coordinator +SELECT k FROM test order by k desc limit 5 offset 9906 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_3', merge_tree_min_rows_for_concurrent_read=1, max_threads=1; + +SYSTEM FLUSH LOGS; +SELECT ProfileEvents['ParallelReplicasUsedCount'] FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_3' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0; + DROP TABLE test;