From 372b948d34fe24257e096a6b7d6c9ea03ba0023a Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Thu, 27 Jun 2024 20:30:05 +0000 Subject: [PATCH] Fix: progress bar, reading in order --- src/Interpreters/ClusterProxy/executeQuery.cpp | 16 ++++++++-------- src/Processors/QueryPlan/ReadFromMergeTree.cpp | 10 ++++++---- ...2898_parallel_replicas_progress_bar.reference | 6 ++++++ .../02898_parallel_replicas_progress_bar.sql | 10 +++++----- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index da88f16b504..9302baf57ca 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -500,6 +500,14 @@ void executeQueryWithParallelReplicas( /// do not build local plan for distributed queries for now (address it later) if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num) { + auto local_plan = createLocalPlanForParallelReplicas( + query_ast, + header, + new_context, + processed_stage, + coordinator, + std::move(analyzed_read_from_merge_tree)); + auto read_from_remote = std::make_unique( query_ast, new_cluster, @@ -518,14 +526,6 @@ void executeQueryWithParallelReplicas( auto remote_plan = std::make_unique(); remote_plan->addStep(std::move(read_from_remote)); - auto local_plan = createLocalPlanForParallelReplicas( - query_ast, - header, - new_context, - processed_stage, - coordinator, - std::move(analyzed_read_from_merge_tree)); - DataStreams input_streams; input_streams.reserve(2); input_streams.emplace_back(local_plan->getCurrentDataStream()); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 7cf574ddf5c..e4e251b694e 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -586,12 +586,14 @@ Pipe ReadFromMergeTree::readInOrder( context); } - /// Actually it means that parallel reading from replicas enabled - /// and we have to collaborate with initiator. + /// Actually it means that parallel reading from replicas enabled and read snapshot is not local - + /// we can't rely on local snapshot /// In this case we won't set approximate rows, because it will be accounted multiple times. /// Also do not count amount of read rows if we read in order of sorting key, /// because we don't know actual amount of read rows in case when limit is set. - bool set_rows_approx = !is_parallel_reading_from_replicas && !reader_settings.read_in_order; + const UInt64 in_order_limit = query_info.input_order_info ? query_info.input_order_info->limit : 0; + const bool set_total_rows_approx + = !(is_parallel_reading_from_replicas && context->canUseParallelReplicasOnFollower()) && !in_order_limit; Pipes pipes; for (size_t i = 0; i < parts_with_ranges.size(); ++i) @@ -621,7 +623,7 @@ Pipe ReadFromMergeTree::readInOrder( processor->addPartLevelToChunk(isQueryWithFinal()); auto source = std::make_shared(std::move(processor)); - if (set_rows_approx) + if (set_total_rows_approx) source->addTotalRowsApprox(total_rows); pipes.emplace_back(std::move(source)); diff --git a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.reference b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.reference index 380aac4dbe8..c66597436f3 100644 --- a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.reference +++ b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.reference @@ -1,2 +1,8 @@ 3000 1000 3999 2499.5 1 +1998 2944475297004403859 +1999 254596732598015005 +2000 6863370867519437063 +2001 17844331710293705251 +2002 1587587338113897332 +1 diff --git a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql index 42f8091db08..d3bf228e0fb 100644 --- a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql +++ b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql @@ -26,12 +26,12 @@ WHERE query_id in (select query_id from system.query_log where current_database AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0; -- reading in order coordinator --- SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b'; +SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, parallel_replicas_local_plan=0, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b'; --- SYSTEM FLUSH LOGS; --- SELECT count() > 0 FROM system.text_log --- WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b') --- AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0; +SYSTEM FLUSH LOGS; +SELECT count() > 0 FROM system.text_log +WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b') + AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0; DROP TABLE t1 SYNC; DROP TABLE t2 SYNC;