mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Fix spreading for ReadInOrderOptimizer with expression in ORDER BY
This will fix optimize_read_in_order/optimize_aggregation_in_order with max_threads>0 and expression in ORDER BY
This commit is contained in:
parent
b161127fc1
commit
2389406c21
@ -82,6 +82,17 @@ static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts
|
||||
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
|
||||
}
|
||||
|
||||
/// Check if ORDER BY clause of the query has some expression.
|
||||
static bool sortingDescriptionHasExpressions(const SortDescription & sort_description, const StorageMetadataPtr & metadata_snapshot)
|
||||
{
|
||||
auto all_columns = metadata_snapshot->getColumns();
|
||||
for (const auto & sort_column : sort_description)
|
||||
{
|
||||
if (!all_columns.has(sort_column.column_name))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
|
||||
const MergeTreeData::DataPartsVector & parts,
|
||||
@ -1065,6 +1076,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
|
||||
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
|
||||
bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold);
|
||||
size_t max_output_ports = 0;
|
||||
|
||||
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
|
||||
{
|
||||
@ -1174,25 +1186,43 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
|
||||
});
|
||||
}
|
||||
|
||||
if (pipe.numOutputPorts() > 1 && need_preliminary_merge)
|
||||
max_output_ports = std::max(pipe.numOutputPorts(), max_output_ports);
|
||||
res.emplace_back(std::move(pipe));
|
||||
}
|
||||
|
||||
if (need_preliminary_merge)
|
||||
{
|
||||
/// If ORDER BY clause of the query contains some expression,
|
||||
/// then those new columns should be added for the merge step,
|
||||
/// and this should be done always, if there is at least one pipe that
|
||||
/// has multiple output ports.
|
||||
bool sorting_key_has_expression = sortingDescriptionHasExpressions(input_order_info->order_key_prefix_descr, metadata_snapshot);
|
||||
bool force_sorting_key_transform = res.size() > 1 && max_output_ports > 1 && sorting_key_has_expression;
|
||||
|
||||
for (auto & pipe : res)
|
||||
{
|
||||
SortDescription sort_description;
|
||||
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
|
||||
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
|
||||
input_order_info->direction, 1);
|
||||
|
||||
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
|
||||
out_projection = createProjection(pipe, data);
|
||||
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header)
|
||||
if (pipe.numOutputPorts() > 1 || force_sorting_key_transform)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
|
||||
});
|
||||
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
|
||||
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
|
||||
input_order_info->direction, 1);
|
||||
|
||||
pipe.addTransform(std::make_shared<MergingSortedTransform>(
|
||||
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
|
||||
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
|
||||
out_projection = createProjection(pipe, data);
|
||||
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
|
||||
});
|
||||
}
|
||||
|
||||
if (pipe.numOutputPorts() > 1)
|
||||
{
|
||||
pipe.addTransform(std::make_shared<MergingSortedTransform>(
|
||||
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
|
||||
}
|
||||
}
|
||||
|
||||
res.emplace_back(std::move(pipe));
|
||||
}
|
||||
|
||||
return Pipe::unitePipes(std::move(res));
|
||||
|
Loading…
Reference in New Issue
Block a user