Merge pull request #16637 from azat/mt-read_in_order-spread-fix

Fix spreading for ReadInOrderOptimizer with expression in ORDER BY
This commit is contained in:
alexey-milovidov 2020-11-06 17:36:03 +03:00 committed by GitHub
commit 7fb53b205c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 13 deletions

View File

@ -82,6 +82,17 @@ static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
}
/// Check if ORDER BY clause of the query has some expression.
static bool sortingDescriptionHasExpressions(const SortDescription & sort_description, const StorageMetadataPtr & metadata_snapshot)
{
auto all_columns = metadata_snapshot->getColumns();
for (const auto & sort_column : sort_description)
{
if (!all_columns.has(sort_column.column_name))
return true;
}
return false;
}
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
const MergeTreeData::DataPartsVector & parts,
@ -1074,6 +1085,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold);
size_t max_output_ports = 0;
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
{
@ -1183,25 +1195,43 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
});
}
if (pipe.numOutputPorts() > 1 && need_preliminary_merge)
max_output_ports = std::max(pipe.numOutputPorts(), max_output_ports);
res.emplace_back(std::move(pipe));
}
if (need_preliminary_merge)
{
/// If ORDER BY clause of the query contains some expression,
/// then those new columns should be added for the merge step,
/// and this should be done always, if there is at least one pipe that
/// has multiple output ports.
bool sorting_key_has_expression = sortingDescriptionHasExpressions(input_order_info->order_key_prefix_descr, metadata_snapshot);
bool force_sorting_key_transform = res.size() > 1 && max_output_ports > 1 && sorting_key_has_expression;
for (auto & pipe : res)
{
SortDescription sort_description;
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
input_order_info->direction, 1);
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header)
if (pipe.numOutputPorts() > 1 || force_sorting_key_transform)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
});
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
input_order_info->direction, 1);
pipe.addTransform(std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
});
}
if (pipe.numOutputPorts() > 1)
{
pipe.addTransform(std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
}
}
res.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(res));

View File

@ -0,0 +1,11 @@
(Expression)
ExpressionTransform
(Expression)
ExpressionTransform
(Aggregating)
FinalizingSimpleTransform
AggregatingSortedTransform 3 → 1
AggregatingInOrderTransform × 3
(Expression)
ExpressionTransform × 3
(ReadFromStorage)

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS data_01551;
CREATE TABLE data_01551
(
key UInt32
) engine=AggregatingMergeTree()
PARTITION BY key%2
ORDER BY (key, key/2)
SETTINGS index_granularity=10;
INSERT INTO data_01551 SELECT number FROM numbers(100000);
SET max_threads=3;
SET merge_tree_min_rows_for_concurrent_read=10000;
SET optimize_aggregation_in_order=1;
SET read_in_order_two_level_merge_threshold=1;
EXPLAIN PIPELINE SELECT key FROM data_01551 GROUP BY key, key/2;