diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f06bfb97b2c..a38d50e56fb 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -82,6 +82,17 @@ static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_part")}; } +/// Check if ORDER BY clause of the query has some expression. +static bool sortingDescriptionHasExpressions(const SortDescription & sort_description, const StorageMetadataPtr & metadata_snapshot) +{ + auto all_columns = metadata_snapshot->getColumns(); + for (const auto & sort_column : sort_description) + { + if (!all_columns.has(sort_column.column_name)) + return true; + } + return false; +} size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( const MergeTreeData::DataPartsVector & parts, @@ -1065,6 +1076,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold); + size_t max_output_ports = 0; for (size_t i = 0; i < num_streams && !parts.empty(); ++i) { @@ -1174,25 +1186,43 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( }); } - if (pipe.numOutputPorts() > 1 && need_preliminary_merge) + max_output_ports = std::max(pipe.numOutputPorts(), max_output_ports); + res.emplace_back(std::move(pipe)); + } + + if (need_preliminary_merge) + { + /// If ORDER BY clause of the query contains some expression, + /// then those new columns should be added for the merge step, + /// and this should be done always, if there is at least one pipe that + /// has multiple output ports. + bool sorting_key_has_expression = sortingDescriptionHasExpressions(input_order_info->order_key_prefix_descr, metadata_snapshot); + bool force_sorting_key_transform = res.size() > 1 && max_output_ports > 1 && sorting_key_has_expression; + + for (auto & pipe : res) { SortDescription sort_description; - for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) - sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], - input_order_info->direction, 1); - /// Drop temporary columns, added by 'sorting_key_prefix_expr' - out_projection = createProjection(pipe, data); - pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header) + if (pipe.numOutputPorts() > 1 || force_sorting_key_transform) { - return std::make_shared(header, sorting_key_prefix_expr); - }); + for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) + sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], + input_order_info->direction, 1); - pipe.addTransform(std::make_shared( - pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size)); + /// Drop temporary columns, added by 'sorting_key_prefix_expr' + out_projection = createProjection(pipe, data); + pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header) + { + return std::make_shared(header, sorting_key_prefix_expr); + }); + } + + if (pipe.numOutputPorts() > 1) + { + pipe.addTransform(std::make_shared( + pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size)); + } } - - res.emplace_back(std::move(pipe)); } return Pipe::unitePipes(std::move(res));