do not implicitly read pk and version columns in lonely parts if nnot necessary

This commit is contained in:
Duc Canh Le 2023-08-29 09:41:18 +00:00
parent ea7f583708
commit e9b0d3e4a2
4 changed files with 60 additions and 8 deletions

View File

@ -979,7 +979,7 @@ static void addMergingFinal(
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & column_names, ActionsDAGPtr & out_projection) RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection)
{ {
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
@ -1141,17 +1141,16 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts) if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size()); num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size());
auto pipe = read(std::move(lonely_parts), column_names, ReadFromMergeTree::ReadType::Default, auto pipe = read(std::move(lonely_parts), partition_pipes.empty() ? origin_column_names : column_names, ReadFromMergeTree::ReadType::Default,
num_streams_for_lonely_parts, min_marks_for_concurrent_read, info.use_uncompressed_cache); num_streams_for_lonely_parts, min_marks_for_concurrent_read, info.use_uncompressed_cache);
/// Drop temporary columns, added by 'sorting_key_expr' /// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection) if (!out_projection)
out_projection = createProjection(pipe.getHeader()); out_projection = createProjection(pipe.getHeader());
pipe.addSimpleTransform([sorting_expr](const Block & header) if (!partition_pipes.empty())
{ pipe.addSimpleTransform([sorting_expr](const Block & header)
return std::make_shared<ExpressionTransform>(header, sorting_expr); { return std::make_shared<ExpressionTransform>(header, sorting_expr); });
});
partition_pipes.emplace_back(std::move(pipe)); partition_pipes.emplace_back(std::move(pipe));
} }
@ -1742,7 +1741,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
::sort(column_names_to_read.begin(), column_names_to_read.end()); ::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, column_names_to_read, result_projection); return spreadMarkRangesAmongStreamsFinal(std::move(parts_with_ranges), num_streams, result.column_names_to_read, column_names_to_read, result_projection);
} }
else if (input_order_info) else if (input_order_info)
{ {

View File

@ -314,7 +314,7 @@ private:
const InputOrderInfoPtr & input_order_info); const InputOrderInfoPtr & input_order_info);
Pipe spreadMarkRangesAmongStreamsFinal( Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, size_t num_streams, const Names & column_names, ActionsDAGPtr & out_projection); RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection);
ReadFromMergeTree::AnalysisResult getAnalysisResult() const; ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr; MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr;

View File

@ -0,0 +1,21 @@
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 16
Header × 16 : max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(ReadFromMergeTree)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
MergeTreeThread × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS all_lonely;
CREATE TABLE all_lonely
(
`id` UInt64,
`dt` Date,
`val` UInt64,
`version` UInt64
)
ENGINE = ReplacingMergeTree(version)
PARTITION BY dt
ORDER BY (id);
INSERT INTO all_lonely SELECT number, '2022-10-28', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-28', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-28' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-29', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-29', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-29' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-30', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-30', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-30' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-31', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-31', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-31' FINAL;
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM all_lonely FINAL SETTINGS do_not_merge_across_partitions_select_final = 1, max_threads = 16;