do not implicitly read pk + version and compute sort expression for lonely parts

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-08-30 05:05:57 +00:00
parent e9b0d3e4a2
commit 11c94246c7
5 changed files with 321 additions and 98 deletions

View File

@ -982,7 +982,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts_with_ranges, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection)
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
const auto & data_settings = data.getSettings();
PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
@ -1017,7 +1017,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
parts_to_merge_ranges.push_back(parts_with_ranges.end());
}
Pipes partition_pipes;
Pipes merging_pipes;
Pipes no_merging_pipes;
/// If do_not_merge_across_partitions_select_final is true and num_streams > 1
/// we will store lonely parts with level > 0 to use parallel select on them.
@ -1028,20 +1029,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't post-process this part, and if num_streams > 1 we
/// can use parallel select on such parts.
bool no_merging_final = settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty();
Pipes pipes;
{
RangesInDataParts new_parts;
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
/// can use parallel select on such parts. We save such parts in one vector and then use
/// MergeTreeReadPool and MergeTreeThreadSelectProcessor for parallel select.
if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0
&& data.merging_params.is_deleted_column.empty())
if (no_merging_final)
{
sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount();
if (num_streams > 1)
sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount();
lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index]));
continue;
}
@ -1091,16 +1093,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
out_projection = createProjection(pipes.front().getHeader());
}
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
if (settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty())
{
partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
continue;
}
Names sort_columns = metadata_for_reading->getSortingKeyColumns();
SortDescription sort_description;
@ -1123,39 +1115,72 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
partition_key_columns,
max_block_size);
partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
}
if (!lonely_parts.empty())
{
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
Pipe pipe;
if (num_streams > 1)
{
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
info.index_granularity_bytes,
sum_marks_in_lonely_parts);
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
info.index_granularity_bytes,
sum_marks_in_lonely_parts);
/// Reduce the number of num_streams_for_lonely_parts if the data is small.
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size());
/// Reduce the number of num_streams_for_lonely_parts if the data is small.
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read
&& lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max(
(sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read,
lonely_parts.size());
auto pipe = read(std::move(lonely_parts), partition_pipes.empty() ? origin_column_names : column_names, ReadFromMergeTree::ReadType::Default,
num_streams_for_lonely_parts, min_marks_for_concurrent_read, info.use_uncompressed_cache);
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
if (!partition_pipes.empty())
pipe.addSimpleTransform([sorting_expr](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
partition_pipes.emplace_back(std::move(pipe));
pipe = read(
std::move(lonely_parts),
origin_column_names,
ReadFromMergeTree::ReadType::Default,
num_streams_for_lonely_parts,
min_marks_for_concurrent_read,
info.use_uncompressed_cache);
}
else
{
pipe = read(
std::move(lonely_parts),
origin_column_names,
ReadFromMergeTree::ReadType::InOrder,
num_streams,
0,
info.use_uncompressed_cache);
}
no_merging_pipes.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(partition_pipes));
if (!merging_pipes.empty() && !no_merging_pipes.empty())
{
out_projection = nullptr; /// We do projection here
Pipes pipes;
pipes.resize(2);
pipes[0] = Pipe::unitePipes(std::move(merging_pipes));
pipes[1] = Pipe::unitePipes(std::move(no_merging_pipes));
auto conversion_action = ActionsDAG::makeConvertingActions(
pipes[0].getHeader().getColumnsWithTypeAndName(),
pipes[1].getHeader().getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
pipes[0].addSimpleTransform(
[conversion_action](const Block & header)
{
auto converting_expr = std::make_shared<ExpressionActions>(conversion_action);
return std::make_shared<ExpressionTransform>(header, converting_expr);
});
return Pipe::unitePipes(std::move(pipes));
}
else
return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes));
}
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(

View File

@ -1,21 +0,0 @@
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 16
Header × 16 : max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(ReadFromMergeTree)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
MergeTreeThread × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)

View File

@ -1,32 +0,0 @@
DROP TABLE IF EXISTS all_lonely;
CREATE TABLE all_lonely
(
`id` UInt64,
`dt` Date,
`val` UInt64,
`version` UInt64
)
ENGINE = ReplacingMergeTree(version)
PARTITION BY dt
ORDER BY (id);
INSERT INTO all_lonely SELECT number, '2022-10-28', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-28', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-28' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-29', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-29', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-29' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-30', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-30', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-30' FINAL;
INSERT INTO all_lonely SELECT number, '2022-10-31', number*10, 0 FROM numbers(10000);
INSERT INTO all_lonely SELECT number+500000, '2022-10-31', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE all_lonely PARTITION '2022-10-31' FINAL;
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM all_lonely FINAL SETTINGS do_not_merge_across_partitions_select_final = 1, max_threads = 16;

View File

@ -0,0 +1,202 @@
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 6 → 16
Header × 16 : max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 6
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 6 → 6
Header × 6 : val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 6
Header: val UInt64: val UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeThread × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)
ExpressionTransform × 2
Header: val UInt64: val UInt64 UInt64(size = 0)
ReplacingSorted 2 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
FilterSortedStreamByRange × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
Description: filter values in [(50000), +inf)
ExpressionTransform × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
MergeTreeInOrder × 2 0 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
ReplacingSorted 2 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
FilterSortedStreamByRange × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
Description: filter values in [-inf, (50000))
ExpressionTransform × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
MergeTreeInOrder × 2 0 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 16
Header × 16 : max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(Filter)
FilterTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeThread × 4 0 → 1
Header: dt Date: dt Date UInt16(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 1
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(Filter)
FilterTransform × 4
Header: val UInt64: val UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeInOrder × 4 0 → 1
Header: dt Date: dt Date UInt16(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 6 → 16
Header × 16 : max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 6
Header: max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 6 → 6
Header × 6 : val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 6
Header: val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeThread × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)
ExpressionTransform × 2
Header: val UInt64: val UInt64 UInt64(size = 0)
ReplacingSorted 2 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
FilterSortedStreamByRange × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
Description: filter values in [(50000), +inf)
ExpressionTransform × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
MergeTreeInOrder × 2 0 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
ReplacingSorted 2 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
FilterSortedStreamByRange × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
Description: filter values in [-inf, (50000))
ExpressionTransform × 2
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
MergeTreeInOrder × 2 0 → 1
Header: id UInt64: id UInt64 UInt64(size = 0)
val UInt64: val UInt64 UInt64(size = 0)
version UInt64: version UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 16
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 16
Header × 16 : max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(Filter)
FilterTransform × 4
Header: val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeThread × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)
dt Date: dt Date UInt16(size = 0)
(Expression)
ExpressionTransform
Header: max(val) UInt64: max(val) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
(Aggregating)
Resize 4 → 1
Header: max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
AggregatingTransform × 4
Header: max(val_0) UInt64: max(val_0) UInt64 UInt64(size = 0)
count() UInt64: count() UInt64 UInt64(size = 0)
StrictResize 4 → 4
Header × 4 : val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(Expression)
ExpressionTransform × 4
Header: val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(Filter)
FilterTransform × 4
Header: val_0 UInt64: val_0 UInt64 UInt64(size = 0)
(ReadFromMergeTree)
MergeTreeInOrder × 4 0 → 1
Header: val UInt64: val UInt64 UInt64(size = 0)
dt Date: dt Date UInt16(size = 0)

View File

@ -0,0 +1,49 @@
DROP TABLE IF EXISTS with_lonely;
CREATE TABLE with_lonely
(
`id` UInt64,
`dt` Date,
`val` UInt64,
`version` UInt64
)
ENGINE = ReplacingMergeTree(version)
PARTITION BY dt
ORDER BY (id);
INSERT INTO with_lonely SELECT number, '2022-10-28', number*10, 0 FROM numbers(10000);
INSERT INTO with_lonely SELECT number+500000, '2022-10-28', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE with_lonely PARTITION '2022-10-28' FINAL;
INSERT INTO with_lonely SELECT number, '2022-10-29', number*10, 0 FROM numbers(10000);
INSERT INTO with_lonely SELECT number+500000, '2022-10-29', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE with_lonely PARTITION '2022-10-29' FINAL;
INSERT INTO with_lonely SELECT number, '2022-10-30', number*10, 0 FROM numbers(10000);
INSERT INTO with_lonely SELECT number+500000, '2022-10-30', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE with_lonely PARTITION '2022-10-30' FINAL;
INSERT INTO with_lonely SELECT number, '2022-10-31', number*10, 0 FROM numbers(10000);
INSERT INTO with_lonely SELECT number+500000, '2022-10-31', number*10, 1 FROM numbers(10000);
OPTIMIZE TABLE with_lonely PARTITION '2022-10-31' FINAL;
SYSTEM STOP MERGES with_lonely;
INSERT INTO with_lonely SELECT number, '2022-11-01', number*10, 0 FROM numbers(1000);
INSERT INTO with_lonely SELECT number+50000, '2022-11-01', number*10, 1 FROM numbers(1000);
INSERT INTO with_lonely SELECT number+60000, '2022-11-01', number*10, 2 FROM numbers(1000);
SET do_not_merge_across_partitions_select_final = 1, max_threads = 16;
-- mix lonely parts and non-lonely parts
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL SETTINGS allow_experimental_analyzer = 0;
-- only lonely parts
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL WHERE dt < '2022-11-01' SETTINGS allow_experimental_analyzer = 0;
-- only lonely parts but max_thread = 1, so reading lonely parts with in-order
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL WHERE dt < '2022-11-01' SETTINGS max_threads = 1, allow_experimental_analyzer = 0;
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL SETTINGS allow_experimental_analyzer = 1;
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL WHERE dt < '2022-11-01' SETTINGS allow_experimental_analyzer = 1;
EXPLAIN PIPELINE header=1 SELECT max(val), count(*) FROM with_lonely FINAL WHERE dt < '2022-11-01' SETTINGS max_threads = 1, allow_experimental_analyzer = 1;