From cf762347d5ca89e7c8f9fc94793c6d12e09e5c70 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 3 Feb 2023 12:01:35 +0000 Subject: [PATCH] Fix read in order optimization for DESC sorting with FINAL --- .../QueryPlan/ReadFromMergeTree.cpp | 21 +++++++++++++------ src/Processors/QueryPlan/ReadFromMergeTree.h | 1 + .../25336_read_in_order_final_desc.reference | 5 +++++ .../25336_read_in_order_final_desc.sql | 21 +++++++++++++++++++ 4 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 tests/queries/0_stateless/25336_read_in_order_final_desc.reference create mode 100644 tests/queries/0_stateless/25336_read_in_order_final_desc.sql diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index cca8e5297ee..748a182111f 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -820,6 +820,7 @@ static void addMergingFinal( Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts_with_ranges, const Names & column_names, + const InputOrderInfoPtr & input_order_info, ActionsDAGPtr & out_projection) { const auto & settings = context->getSettingsRef(); @@ -894,15 +895,19 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( if (new_parts.empty()) continue; + auto read_type = ReadFromMergeTree::ReadType::InOrder; + if (input_order_info && input_order_info->direction == -1) + read_type = ReadType::InReverseOrder; + if (num_streams > 1 && metadata_for_reading->hasPrimaryKey()) { // Let's split parts into layers to ensure data parallelism of FINAL. - auto reading_step_getter = [this, &column_names, &info](auto parts) + auto reading_step_getter = [this, &column_names, &info, read_type](auto parts) { return this->read( std::move(parts), column_names, - ReadFromMergeTree::ReadType::InOrder, + read_type, 1 /* num_streams */, 0 /* min_marks_for_concurrent_read */, info.use_uncompressed_cache); @@ -913,7 +918,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( else { pipes.emplace_back(read( - std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache)); + std::move(new_parts), column_names, read_type, num_streams, 0, info.use_uncompressed_cache)); } /// Drop temporary columns, added by 'sorting_key_expr' @@ -928,11 +933,14 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( pipe.addSimpleTransform([sorting_expr](const Block & header) { return std::make_shared(header, sorting_expr); }); - /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition - /// with level > 0 then we won't postprocess this part + /// If do_not_merge_across_partitions_select_final is true + /// and there is only one part in partition with level > 0 + /// then we won't postprocess this part + bool should_read_in_order = !input_order_info || input_order_info->direction == 0; if (settings.do_not_merge_across_partitions_select_final && std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && - parts_to_merge_ranges[range_index]->data_part->info.level > 0) + parts_to_merge_ranges[range_index]->data_part->info.level > 0 && + !should_read_in_order) { partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes))); continue; @@ -1397,6 +1405,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons pipe = spreadMarkRangesAmongStreamsFinal( std::move(result.parts_with_ranges), column_names_to_read, + input_order_info, result_projection); } else if (input_order_info) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 8b2eca5e08e..ba430fbadef 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -237,6 +237,7 @@ private: Pipe spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, const Names & column_names, + const InputOrderInfoPtr & input_order_info, ActionsDAGPtr & out_projection); MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const; diff --git a/tests/queries/0_stateless/25336_read_in_order_final_desc.reference b/tests/queries/0_stateless/25336_read_in_order_final_desc.reference new file mode 100644 index 00000000000..83c7162f120 --- /dev/null +++ b/tests/queries/0_stateless/25336_read_in_order_final_desc.reference @@ -0,0 +1,5 @@ +1900000050000 1 +1900000040000 0.05 +1900000030000 0 +1900000020000 -0.0002 +1900000010000 -1 diff --git a/tests/queries/0_stateless/25336_read_in_order_final_desc.sql b/tests/queries/0_stateless/25336_read_in_order_final_desc.sql new file mode 100644 index 00000000000..ee59badf094 --- /dev/null +++ b/tests/queries/0_stateless/25336_read_in_order_final_desc.sql @@ -0,0 +1,21 @@ +SET optimize_read_in_order = 1; +DROP TABLE IF EXISTS mytable; + +CREATE TABLE mytable +( + timestamp UInt64, + insert_timestamp UInt64, + key UInt64, + value Float64 +) ENGINE = ReplacingMergeTree(insert_timestamp) + PRIMARY KEY (key, timestamp) + ORDER BY (key, timestamp); + +INSERT INTO mytable (timestamp, insert_timestamp, key, value) VALUES (1900000010000, 1675159000000, 5, 555), (1900000010000, 1675159770000, 5, -1), (1900000020000, 1675159770000, 5, -0.0002), (1900000030000, 1675159770000, 5, 0), (1900000020000, 1675159700000, 5, 555), (1900000040000, 1675159770000, 5, 0.05), (1900000050000, 1675159770000, 5, 1); + +SELECT timestamp, value +FROM mytable FINAL +WHERE key = 5 +ORDER BY timestamp DESC; + +DROP TABLE IF EXISTS mytable;