mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Merge pull request #53489 from ClickHouse/fix-wrong-column-order-in-parallel-final
Fix wrong columns order for queries with parallel FINAL.
This commit is contained in:
commit
8dbe82c192
@ -254,6 +254,32 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column)
|
||||
{
|
||||
std::unordered_map<std::string_view, const ActionsDAG::Node *> inputs_map;
|
||||
for (const auto * input : dag.getInputs())
|
||||
inputs_map[input->result_name] = input;
|
||||
|
||||
for (const auto & col : header)
|
||||
{
|
||||
auto & input = inputs_map[col.name];
|
||||
if (!input)
|
||||
input = &dag.addInput(col);
|
||||
}
|
||||
|
||||
ActionsDAG::NodeRawConstPtrs new_outputs;
|
||||
new_outputs.reserve(header.columns() + 1);
|
||||
|
||||
new_outputs.push_back(&dag.findInOutputs(filter_column));
|
||||
for (const auto & col : header)
|
||||
{
|
||||
auto & input = inputs_map[col.name];
|
||||
new_outputs.push_back(input);
|
||||
}
|
||||
|
||||
dag.getOutputs() = std::move(new_outputs);
|
||||
}
|
||||
|
||||
Pipes buildPipesForReadingByPKRanges(
|
||||
const KeyDescription & primary_key,
|
||||
ExpressionActionsPtr sorting_expr,
|
||||
@ -279,6 +305,7 @@ Pipes buildPipesForReadingByPKRanges(
|
||||
continue;
|
||||
auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
|
||||
auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
|
||||
reorderColumns(*actions, pipes[i].getHeader(), filter_function->getColumnName());
|
||||
ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
|
||||
auto description = fmt::format(
|
||||
"filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
|
||||
|
@ -27,6 +27,9 @@ public:
|
||||
true)
|
||||
, filter_transform(header_, expression_, filter_column_name_, remove_filter_column_, on_totals_)
|
||||
{
|
||||
assertBlocksHaveEqualStructure(
|
||||
header_, getOutputPort().getHeader(),
|
||||
"Expression for FilterSortedStreamByRange should not change header");
|
||||
}
|
||||
|
||||
String getName() const override { return "FilterSortedStreamByRange"; }
|
||||
|
@ -0,0 +1 @@
|
||||
1000000
|
@ -0,0 +1,9 @@
|
||||
-- Tags: no-random-merge-tree-settings
|
||||
-- Because we insert one million rows, it shouldn't choose too low index granularity.
|
||||
|
||||
drop table if exists tab2;
|
||||
create table tab2 (id String, version Int64, l String, accountCode String, z Int32) engine = ReplacingMergeTree(z) PRIMARY KEY (accountCode, id) ORDER BY (accountCode, id, version, l);
|
||||
insert into tab2 select toString(number), number, toString(number), toString(number), 0 from numbers(1e6);
|
||||
set max_threads=2;
|
||||
select count() from tab2 final;
|
||||
DROP TABLE tab2;
|
Loading…
Reference in New Issue
Block a user