diff --git a/src/Processors/QueryPlan/PartsSplitter.cpp b/src/Processors/QueryPlan/PartsSplitter.cpp index 533fbde1e13..61c6422de5a 100644 --- a/src/Processors/QueryPlan/PartsSplitter.cpp +++ b/src/Processors/QueryPlan/PartsSplitter.cpp @@ -254,6 +254,32 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column) +{ + std::unordered_map inputs_map; + for (const auto * input : dag.getInputs()) + inputs_map[input->result_name] = input; + + for (const auto & col : header) + { + auto & input = inputs_map[col.name]; + if (!input) + input = &dag.addInput(col); + } + + ActionsDAG::NodeRawConstPtrs new_outputs; + new_outputs.reserve(header.columns() + 1); + + new_outputs.push_back(&dag.findInOutputs(filter_column)); + for (const auto & col : header) + { + auto & input = inputs_map[col.name]; + new_outputs.push_back(input); + } + + dag.getOutputs() = std::move(new_outputs); +} + Pipes buildPipesForReadingByPKRanges( const KeyDescription & primary_key, ExpressionActionsPtr sorting_expr, @@ -279,6 +305,7 @@ Pipes buildPipesForReadingByPKRanges( continue; auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes()); auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false); + reorderColumns(*actions, pipes[i].getHeader(), filter_function->getColumnName()); ExpressionActionsPtr expression_actions = std::make_shared(std::move(actions)); auto description = fmt::format( "filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf"); diff --git a/src/Processors/Transforms/FilterSortedStreamByRange.h b/src/Processors/Transforms/FilterSortedStreamByRange.h index e1141ebd299..e3d3f6f10ef 100644 --- a/src/Processors/Transforms/FilterSortedStreamByRange.h +++ b/src/Processors/Transforms/FilterSortedStreamByRange.h @@ -27,6 +27,9 @@ public: true) , filter_transform(header_, expression_, filter_column_name_, remove_filter_column_, on_totals_) { + assertBlocksHaveEqualStructure( + header_, getOutputPort().getHeader(), + "Expression for FilterSortedStreamByRange should not change header"); } String getName() const override { return "FilterSortedStreamByRange"; } diff --git a/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.reference b/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.reference new file mode 100644 index 00000000000..749fce669df --- /dev/null +++ b/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.reference @@ -0,0 +1 @@ +1000000 diff --git a/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.sql b/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.sql new file mode 100644 index 00000000000..db15abb28cb --- /dev/null +++ b/tests/queries/0_stateless/02841_parallel_final_wrong_columns_order.sql @@ -0,0 +1,9 @@ +-- Tags: no-random-merge-tree-settings +-- Because we insert one million rows, it shouldn't choose too low index granularity. + +drop table if exists tab2; +create table tab2 (id String, version Int64, l String, accountCode String, z Int32) engine = ReplacingMergeTree(z) PRIMARY KEY (accountCode, id) ORDER BY (accountCode, id, version, l); +insert into tab2 select toString(number), number, toString(number), toString(number), 0 from numbers(1e6); +set max_threads=2; +select count() from tab2 final; +DROP TABLE tab2;