support for aggr in order

This commit is contained in:
Nikita Taranov 2023-01-11 21:10:51 +00:00
parent a2c9aeb7c9
commit 1d45cce03c
6 changed files with 267 additions and 61 deletions

View File

@ -369,6 +369,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
many_data, counter++);
});
if (skip_merging)
{
pipeline.addSimpleTransform([&](const Block & header)
{ return std::make_shared<FinalizeAggregatedTransform>(header, transform_params); });
pipeline.resize(merge_threads);
aggregating_in_order = collector.detachProcessors(0);
return;
}
aggregating_in_order = collector.detachProcessors(0);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(

View File

@ -353,12 +353,7 @@ Pipe ReadFromMergeTree::read(
size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{
if (read_type == ReadType::Default && max_streams > 1)
{
Pipe pipe = readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
if (output_each_partition_through_separate_port)
pipe.resize(1);
return pipe;
}
return readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);
@ -468,9 +463,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsImpl(
info.use_uncompressed_cache);
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
template <typename ReadFunc>
static Pipe runForEachPartition(
RangesInDataParts && parts_with_ranges, size_t requested_num_streams, bool output_each_partition_through_separate_port, ReadFunc read)
{
if (parts_with_ranges.empty())
return {};
@ -479,7 +474,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
if (!output_each_partition_through_separate_port)
{
return spreadMarkRangesAmongStreamsImpl(std::move(parts_with_ranges), column_names, num_streams);
return read(std::move(parts_with_ranges), num_streams);
}
else
{
@ -487,7 +482,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
LOG_DEBUG(
&Poco::Logger::get("debug"),
"spreadMarkRangesAmongStreams {} {} {}",
"{} {} {} {}",
demangle(typeid(ReadFunc).name()).substr(0, 60),
parts_with_ranges.size(),
requested_num_streams,
countPartitions(parts_with_ranges));
@ -500,12 +496,19 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
parts_with_ranges.end(),
[&begin](auto & part) { return begin->data_part->info.partition_id != part.data_part->info.partition_id; });
LOG_DEBUG(&Poco::Logger::get("debug"), "spreadMarkRangesAmongStreams {} {}", begin->data_part->info.partition_id, end - begin);
LOG_DEBUG(
&Poco::Logger::get("debug"),
"{} {} {}",
demangle(typeid(ReadFunc).name()).substr(0, 60),
begin->data_part->info.partition_id,
end - begin);
RangesInDataParts partition_parts;
partition_parts.insert(partition_parts.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
pipes.emplace_back(spreadMarkRangesAmongStreamsImpl(std::move(partition_parts), column_names, num_streams));
pipes.emplace_back(read(std::move(partition_parts), num_streams));
if (!pipes.back().empty())
pipes.back().resize(1);
begin = end;
}
@ -514,6 +517,15 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
}
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_with_ranges, const Names & column_names)
{
auto read = [this, &column_names](RangesInDataParts && parts_with_ranges_, size_t num_streams_)
{
return spreadMarkRangesAmongStreamsImpl(std::move(parts_with_ranges_), column_names, num_streams_);
};
return runForEachPartition(std::move(parts_with_ranges), requested_num_streams, output_each_partition_through_separate_port, read);
}
static ActionsDAGPtr createProjection(const Block & header)
{
auto projection = std::make_shared<ActionsDAG>(header.getNamesAndTypesList());
@ -522,41 +534,21 @@ static ActionsDAGPtr createProjection(const Block & header)
return projection;
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrderImpl(
RangesInDataParts && parts_with_ranges,
const Names & column_names,
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info)
const InputOrderInfoPtr & input_order_info,
size_t num_streams,
bool need_preliminary_merge)
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
Pipes res;
if (info.sum_marks == 0)
return {};
/// PREWHERE actions can remove some input columns (which are needed only for prewhere condition).
/// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key.
/// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
/// See 02354_read_in_order_prewhere.sql as an example.
bool have_input_columns_removed_after_prewhere = false;
if (prewhere_info && prewhere_info->prewhere_actions)
{
auto & outputs = prewhere_info->prewhere_actions->getOutputs();
std::unordered_set<const ActionsDAG::Node *> outputs_set(outputs.begin(), outputs.end());
for (const auto * input : prewhere_info->prewhere_actions->getInputs())
{
if (!outputs_set.contains(input))
{
outputs.push_back(input);
have_input_columns_removed_after_prewhere = true;
}
}
}
/// Let's split ranges to avoid reading much data.
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size]
(const auto & ranges, int direction)
@ -603,8 +595,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
return new_ranges;
};
const size_t min_marks_per_stream = (info.sum_marks - 1) / requested_num_streams + 1;
bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
const size_t min_marks_per_stream = (info.sum_marks - 1) / num_streams + 1;
Pipes pipes;
@ -686,7 +677,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (!pipes.empty())
pipe_header = pipes.front().getHeader();
if (need_preliminary_merge)
if (need_preliminary_merge || output_each_partition_through_separate_port)
{
size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
@ -705,32 +696,79 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
auto sorting_key_expr = std::make_shared<ExpressionActions>(sorting_key_prefix_expr);
for (auto & pipe : pipes)
auto add_merge = [&](Pipe & pipe)
{
pipe.addSimpleTransform([sorting_key_expr](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_expr);
});
{ return std::make_shared<ExpressionTransform>(header, sorting_key_expr); });
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
max_block_size,
SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, SortingQueueStrategy::Batch);
pipe.addTransform(std::move(transform));
}
};
if (need_preliminary_merge)
for (auto & pipe : pipes)
add_merge(pipe);
Pipe pipe = Pipe::unitePipes(std::move(pipes));
if (output_each_partition_through_separate_port)
add_merge(pipe);
return pipe;
}
return Pipe::unitePipes(std::move(pipes));
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts_with_ranges,
const Names & column_names,
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info)
{
if (parts_with_ranges.empty())
return {};
/// PREWHERE actions can remove some input columns (which are needed only for prewhere condition).
/// In case of read-in-order, PREWHERE is executed before sorting. But removed columns could be needed for sorting key.
/// To fix this, we prohibit removing any input in prewhere actions. Instead, projection actions will be added after sorting.
/// See 02354_read_in_order_prewhere.sql as an example.
bool have_input_columns_removed_after_prewhere = false;
if (prewhere_info && prewhere_info->prewhere_actions)
{
auto & outputs = prewhere_info->prewhere_actions->getOutputs();
std::unordered_set<const ActionsDAG::Node *> outputs_set(outputs.begin(), outputs.end());
for (const auto * input : prewhere_info->prewhere_actions->getInputs())
{
if (!outputs_set.contains(input))
{
outputs.push_back(input);
have_input_columns_removed_after_prewhere = true;
}
}
}
if (!pipes.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe_header);
const auto & settings = context->getSettingsRef();
bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
return Pipe::unitePipes(std::move(pipes));
auto read
= [this, &column_names, &input_order_info, need_preliminary_merge](RangesInDataParts && parts_with_ranges_, size_t num_streams_)
{
return spreadMarkRangesAmongStreamsWithOrderImpl(
std::move(parts_with_ranges_), column_names, input_order_info, num_streams_, need_preliminary_merge);
};
Pipe pipe = runForEachPartition(std::move(parts_with_ranges), requested_num_streams, output_each_partition_through_separate_port, read);
if (!pipe.empty() && (need_preliminary_merge || have_input_columns_removed_after_prewhere))
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe.getHeader());
return pipe;
}
static void addMergingFinal(
@ -1269,14 +1307,11 @@ void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
output_stream->sort_description = std::move(sort_description);
output_stream->sort_scope = DataStream::SortScope::Stream;
}
/// Not supported currently. Disable optimisation.
output_each_partition_through_separate_port = false;
}
bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
{
if (isQueryWithFinal() || query_info.getInputOrderInfo())
if (isQueryWithFinal())
return false;
output_each_partition_through_separate_port = true;
@ -1377,9 +1412,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
}
else if (input_order_info)
{
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used when reading in order is used");
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),
column_names_to_read,

View File

@ -238,6 +238,13 @@ private:
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info);
Pipe spreadMarkRangesAmongStreamsWithOrderImpl(
RangesInDataParts && parts_with_ranges,
const Names & column_names,
const InputOrderInfoPtr & input_order_info,
size_t num_streams,
bool need_preliminary_merge);
Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,

View File

@ -3,11 +3,13 @@
<!-- <allow_aggregate_each_partition_independently>1</allow_aggregate_each_partition_independently> -->
</settings>
<create_query>create table t(a UInt32) engine=MergeTree order by tuple() partition by a % 16</create_query>
<create_query>create table t(a UInt32) engine=MergeTree order by a partition by a % 16</create_query>
<fill_query>insert into t select * from numbers_mt(5e7)</fill_query>
<query>select a from t group by a format Null</query>
<query>select a from t group by a format Null settings optimize_aggregation_in_order = 1</query>
<drop_query>drop table t</drop_query>
</test>

View File

@ -81,3 +81,116 @@ ExpressionTransform × 16
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 4 → 16
FinalizeAggregatedTransform × 4
AggregatingInOrderTransform × 4
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 8 → 16
FinalizeAggregatedTransform × 8
AggregatingInOrderTransform × 8
(Expression)
ExpressionTransform × 8
(ReadFromMergeTree)
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
FinalizeAggregatedTransform × 16
AggregatingInOrderTransform × 16
(Expression)
ExpressionTransform × 16
(ReadFromMergeTree)
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
1000000

View File

@ -40,3 +40,46 @@ select count() from (select throwIf(count() != 2) from t3 group by a);
select throwIf(count() != 4) from remote('127.0.0.{1,2}', currentDatabase(), t3) group by a format Null;
drop table t3;
-- aggregation in order --
set optimize_aggregation_in_order = 1;
create table t4(a UInt32) engine=MergeTree order by a partition by a % 4;
system stop merges t4;
insert into t4 select number from numbers_mt(1e6);
insert into t4 select number from numbers_mt(1e6);
explain pipeline select a from t4 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t4 group by a);
drop table t4;
create table t5(a UInt32) engine=MergeTree order by a partition by a % 8;
system stop merges t5;
insert into t5 select number from numbers_mt(1e6);
insert into t5 select number from numbers_mt(1e6);
explain pipeline select a from t5 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t5 group by a);
drop table t5;
create table t6(a UInt32) engine=MergeTree order by a partition by a % 16;
system stop merges t6;
insert into t6 select number from numbers_mt(1e6);
insert into t6 select number from numbers_mt(1e6);
explain pipeline select a from t6 group by a settings read_in_order_two_level_merge_threshold = 1e12;
select count() from (select throwIf(count() != 2) from t6 group by a);
drop table t6;