Fix: progress bar, reading in order

This commit is contained in:
Igor Nikonov 2024-06-27 20:30:05 +00:00
parent 60153d428d
commit 372b948d34
4 changed files with 25 additions and 17 deletions

View File

@ -500,6 +500,14 @@ void executeQueryWithParallelReplicas(
/// do not build local plan for distributed queries for now (address it later)
if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num)
{
auto local_plan = createLocalPlanForParallelReplicas(
query_ast,
header,
new_context,
processed_stage,
coordinator,
std::move(analyzed_read_from_merge_tree));
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
@ -518,14 +526,6 @@ void executeQueryWithParallelReplicas(
auto remote_plan = std::make_unique<QueryPlan>();
remote_plan->addStep(std::move(read_from_remote));
auto local_plan = createLocalPlanForParallelReplicas(
query_ast,
header,
new_context,
processed_stage,
coordinator,
std::move(analyzed_read_from_merge_tree));
DataStreams input_streams;
input_streams.reserve(2);
input_streams.emplace_back(local_plan->getCurrentDataStream());

View File

@ -586,12 +586,14 @@ Pipe ReadFromMergeTree::readInOrder(
context);
}
/// Actually it means that parallel reading from replicas enabled
/// and we have to collaborate with initiator.
/// Actually it means that parallel reading from replicas enabled and read snapshot is not local -
/// we can't rely on local snapshot
/// In this case we won't set approximate rows, because it will be accounted multiple times.
/// Also do not count amount of read rows if we read in order of sorting key,
/// because we don't know actual amount of read rows in case when limit is set.
bool set_rows_approx = !is_parallel_reading_from_replicas && !reader_settings.read_in_order;
const UInt64 in_order_limit = query_info.input_order_info ? query_info.input_order_info->limit : 0;
const bool set_total_rows_approx
= !(is_parallel_reading_from_replicas && context->canUseParallelReplicasOnFollower()) && !in_order_limit;
Pipes pipes;
for (size_t i = 0; i < parts_with_ranges.size(); ++i)
@ -621,7 +623,7 @@ Pipe ReadFromMergeTree::readInOrder(
processor->addPartLevelToChunk(isQueryWithFinal());
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
if (set_rows_approx)
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));

View File

@ -1,2 +1,8 @@
3000 1000 3999 2499.5
1
1998 2944475297004403859
1999 254596732598015005
2000 6863370867519437063
2001 17844331710293705251
2002 1587587338113897332
1

View File

@ -26,12 +26,12 @@ WHERE query_id in (select query_id from system.query_log where current_database
AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
-- reading in order coordinator
-- SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b';
SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, parallel_replicas_local_plan=0, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b';
-- SYSTEM FLUSH LOGS;
-- SELECT count() > 0 FROM system.text_log
-- WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b')
-- AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
DROP TABLE t1 SYNC;
DROP TABLE t2 SYNC;