From 95bbda36677d80792454e0ef7b9484a2d7747b3a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 11 Aug 2020 15:03:18 +0300 Subject: [PATCH 1/8] Specify columns after ARRAY JOIN and JOIN. Add ArrayJoinTransform and ArrayJoinStep. --- src/Interpreters/ActionsVisitor.cpp | 2 +- src/Interpreters/ArrayJoinAction.h | 5 +- src/Interpreters/ExpressionActions.cpp | 4 +- src/Interpreters/ExpressionActions.h | 4 +- src/Interpreters/ExpressionAnalyzer.cpp | 66 ++++++++++--------- src/Interpreters/ExpressionAnalyzer.h | 16 +++-- src/Interpreters/InterpreterSelectQuery.cpp | 20 ++++++ src/Processors/QueryPlan/ArrayJoinStep.cpp | 62 +++++++++++++++++ src/Processors/QueryPlan/ArrayJoinStep.h | 24 +++++++ .../Transforms/ArrayJoinTransform.cpp | 37 +++++++++++ .../Transforms/ArrayJoinTransform.h | 30 +++++++++ src/Processors/ya.make | 2 + 12 files changed, 230 insertions(+), 42 deletions(-) create mode 100644 src/Processors/QueryPlan/ArrayJoinStep.cpp create mode 100644 src/Processors/QueryPlan/ArrayJoinStep.h create mode 100644 src/Processors/Transforms/ArrayJoinTransform.cpp create mode 100644 src/Processors/Transforms/ArrayJoinTransform.h diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index b382b26dcec..1c82bc62f24 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -549,7 +549,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), result_name)); NameSet joined_columns; joined_columns.insert(result_name); - data.addAction(ExpressionAction::arrayJoin(joined_columns, false, data.context)); + data.addAction(ExpressionAction::arrayJoin(std::make_shared(joined_columns, false, data.context))); } return; diff --git a/src/Interpreters/ArrayJoinAction.h b/src/Interpreters/ArrayJoinAction.h index d70c0c14a15..be5be738bb9 100644 --- a/src/Interpreters/ArrayJoinAction.h +++ b/src/Interpreters/ArrayJoinAction.h @@ -12,8 +12,9 @@ class Context; class IFunctionOverloadResolver; using FunctionOverloadResolverPtr = std::shared_ptr; -struct ArrayJoinAction +class ArrayJoinAction { +public: NameSet columns; bool is_left = false; bool is_unaligned = false; @@ -32,4 +33,6 @@ struct ArrayJoinAction void finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns); }; +using ArrayJoinActionPtr = std::shared_ptr; + } diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 0e1d0c51704..f35e6266110 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -143,11 +143,11 @@ ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_c return a; } -ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context) +ExpressionAction ExpressionAction::arrayJoin(ArrayJoinActionPtr array_join_) { ExpressionAction a; a.type = ARRAY_JOIN; - a.array_join = std::make_shared(array_joined_columns, array_join_is_left, context); + a.array_join = std::move(array_join_); return a; } diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 3697dc02ad3..372a17f58df 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -98,7 +98,7 @@ public: bool is_function_compiled = false; /// For ARRAY JOIN - std::shared_ptr array_join; + ArrayJoinActionPtr array_join; /// For JOIN std::shared_ptr table_join; @@ -117,7 +117,7 @@ public: static ExpressionAction project(const NamesWithAliases & projected_columns_); static ExpressionAction project(const Names & projected_columns_); static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_); - static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context); + static ExpressionAction arrayJoin(ArrayJoinActionPtr array_join_); static ExpressionAction ordinaryJoin(std::shared_ptr table_join, JoinPtr join); /// Which columns necessary to perform this action. diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index acbf6255fba..7cabef1df9c 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -173,25 +173,33 @@ void ExpressionAnalyzer::analyzeAggregation() if (select_query) { + NamesAndTypesList array_join_columns; + bool is_array_join_left; - ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left); - if (array_join_expression_list) + if (ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left)) { getRootActionsNoMakeSet(array_join_expression_list, true, temp_actions, false); - addMultipleArrayJoinAction(temp_actions, is_array_join_left); + if (auto array_join = addMultipleArrayJoinAction(temp_actions, is_array_join_left)) + temp_actions->add(ExpressionAction::arrayJoin(array_join)); - array_join_columns.clear(); for (auto & column : temp_actions->getSampleBlock().getNamesAndTypesList()) if (syntax->array_join_result_to_source.count(column.name)) array_join_columns.emplace_back(column); } + columns_after_array_join = sourceColumns(); + columns_after_array_join.insert(columns_after_array_join.end(), array_join_columns.begin(), array_join_columns.end()); + const ASTTablesInSelectQueryElement * join = select_query->join(); if (join) { getRootActionsNoMakeSet(analyzedJoin().leftKeysList(), true, temp_actions, false); addJoinAction(temp_actions); } + + columns_after_join = columns_after_array_join; + const auto & added_by_join = analyzedJoin().columnsAddedByJoin(); + columns_after_join.insert(columns_after_join.end(), added_by_join.begin(), added_by_join.end()); } has_aggregation = makeAggregateDescriptions(temp_actions); @@ -281,16 +289,6 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global) } -NamesAndTypesList ExpressionAnalyzer::sourceWithJoinedColumns() const -{ - auto result_columns = sourceColumns(); - result_columns.insert(result_columns.end(), array_join_columns.begin(), array_join_columns.end()); - result_columns.insert(result_columns.end(), - analyzedJoin().columnsAddedByJoin().begin(), analyzedJoin().columnsAddedByJoin().end()); - return result_columns; -} - - void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name) { auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name); @@ -374,7 +372,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node) } else { - ExpressionActionsPtr temp_actions = std::make_shared(sourceWithJoinedColumns(), context); + ExpressionActionsPtr temp_actions = std::make_shared(columns_after_join, context); getRootActions(left_in_operand, true, temp_actions); Block sample_block_with_calculated_columns = temp_actions->getSampleBlock(); @@ -455,7 +453,7 @@ const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() cons } /// "Big" ARRAY JOIN. -void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool array_join_is_left) const +ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool array_join_is_left) const { NameSet result_columns; for (const auto & result_source : syntax->array_join_result_to_source) @@ -468,25 +466,27 @@ void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actio result_columns.insert(result_source.first); } - actions->add(ExpressionAction::arrayJoin(result_columns, array_join_is_left, context)); + return std::make_shared(result_columns, array_join_is_left, context); } -bool SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types) +ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types) { const auto * select_query = getSelectQuery(); bool is_array_join_left; ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left); if (!array_join_expression_list) - return false; + return nullptr; ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); getRootActions(array_join_expression_list, only_types, step.actions); - addMultipleArrayJoinAction(step.actions, is_array_join_left); + auto array_join = addMultipleArrayJoinAction(step.actions, is_array_join_left); + for (const auto & column : array_join->columns) + step.required_output.emplace_back(column); - return true; + return array_join; } void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr join) const @@ -496,7 +496,7 @@ void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr j bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types) { - ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join); getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions); return true; @@ -506,7 +506,7 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain) { JoinPtr table_join = makeTableJoin(*syntax->ast_join); - ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join); addJoinAction(step.actions, table_join); return true; @@ -720,7 +720,7 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, if (!select_query->where()) return false; - ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join); auto where_column_name = select_query->where()->getColumnName(); step.required_output.push_back(where_column_name); @@ -744,7 +744,7 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain if (!select_query->groupBy()) return false; - ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join); ASTs asts = select_query->groupBy()->children; for (const auto & ast : asts) @@ -755,10 +755,9 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain if (optimize_aggregation_in_order) { - auto all_columns = sourceWithJoinedColumns(); for (auto & child : asts) { - group_by_elements_actions.emplace_back(std::make_shared(all_columns, context)); + group_by_elements_actions.emplace_back(std::make_shared(columns_after_join, context)); getRootActions(child, only_types, group_by_elements_actions.back()); } } @@ -770,7 +769,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression { const auto * select_query = getAggregatingQuery(); - ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join); for (const auto & desc : aggregate_descriptions) for (const auto & name : desc.argument_names) @@ -844,10 +843,9 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain if (optimize_read_in_order) { - auto all_columns = sourceWithJoinedColumns(); for (auto & child : select_query->orderBy()->children) { - order_by_elements_actions.emplace_back(std::make_shared(all_columns, context)); + order_by_elements_actions.emplace_back(std::make_shared(columns_after_join, context)); getRootActions(child, only_types, order_by_elements_actions.back()); } } @@ -1093,7 +1091,13 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( chain.addStep(); } - query_analyzer.appendArrayJoin(chain, only_types || !first_stage); + array_join = query_analyzer.appendArrayJoin(chain, only_types || !first_stage); + if (array_join) + { + before_array_join = chain.getLastActions(true); + if (before_array_join) + chain.addStep(); + } if (query_analyzer.hasTableJoin()) { diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index a37235f2f77..7ffe06ebd9e 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -34,6 +34,9 @@ struct ASTTablesInSelectQueryElement; struct StorageInMemoryMetadata; using StorageMetadataPtr = std::shared_ptr; +class ArrayJoinAction; +using ArrayJoinActionPtr = std::shared_ptr; + /// Create columns in block or return false if not possible bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false); @@ -43,9 +46,12 @@ struct ExpressionAnalyzerData SubqueriesForSets subqueries_for_sets; PreparedSets prepared_sets; + /// Columns after ARRAY JOIN. It there is no ARRAY JOIN, it's source_columns. + NamesAndTypesList columns_after_array_join; + /// Columns after Columns after ARRAY JOIN and JOIN. If there is no JOIN, it's columns_after_array_join. + NamesAndTypesList columns_after_join; /// Columns after ARRAY JOIN, JOIN, and/or aggregation. NamesAndTypesList aggregated_columns; - NamesAndTypesList array_join_columns; bool has_aggregation = false; NamesAndTypesList aggregation_keys; @@ -128,12 +134,10 @@ protected: const TableJoin & analyzedJoin() const { return *syntax->analyzed_join; } const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; } const std::vector & aggregates() const { return syntax->aggregates; } - NamesAndTypesList sourceWithJoinedColumns() const; - /// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables. void initGlobalSubqueriesAndExternalTables(bool do_global); - void addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const; + ArrayJoinActionPtr addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const; void addJoinAction(ExpressionActionsPtr & actions, JoinPtr = {}) const; @@ -175,6 +179,8 @@ struct ExpressionAnalysisResult bool optimize_read_in_order = false; bool optimize_aggregation_in_order = false; + ExpressionActionsPtr before_array_join; + ArrayJoinActionPtr array_join; ExpressionActionsPtr before_join; ExpressionActionsPtr join; ExpressionActionsPtr before_where; @@ -305,7 +311,7 @@ private: */ /// Before aggregation: - bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types); + ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, bool only_types); bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types); bool appendJoin(ExpressionActionsChain & chain); /// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection. diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 19a4e998dc7..604bf55649a 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -862,6 +863,25 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu query_plan.addStep(std::move(row_level_security_step)); } + if (expressions.before_array_join) + { + QueryPlanStepPtr before_array_join_step = std::make_unique( + query_plan.getCurrentDataStream(), + expressions.before_array_join); + before_array_join_step->setStepDescription("Before ARRAY JOIN"); + query_plan.addStep(std::move(before_array_join_step)); + } + + if (expressions.array_join) + { + QueryPlanStepPtr array_join_step = std::make_unique( + query_plan.getCurrentDataStream(), + expressions.array_join); + + array_join_step->setStepDescription("ARRAY JOIN"); + query_plan.addStep(std::move(array_join_step)); + } + if (expressions.before_join) { QueryPlanStepPtr before_join_step = std::make_unique( diff --git a/src/Processors/QueryPlan/ArrayJoinStep.cpp b/src/Processors/QueryPlan/ArrayJoinStep.cpp new file mode 100644 index 00000000000..2948d4cc842 --- /dev/null +++ b/src/Processors/QueryPlan/ArrayJoinStep.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +static ITransformingStep::Traits getTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = false, + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPtr array_join_) + : ITransformingStep( + input_stream_, + ArrayJoinTransform::transformHeader(input_stream_.header, array_join_), + getTraits()) + , array_join(std::move(array_join_)) +{ +} + +void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) + { + bool on_totals = stream_type == QueryPipeline::StreamType::Totals; + return std::make_shared(header, array_join, on_totals); + }); +} + +void ArrayJoinStep::describeActions(FormatSettings & settings) const +{ + String prefix(settings.offset, ' '); + bool first = true; + + settings.out << prefix << (array_join->is_left ? "LEFT " : "") << "ARRAY JOIN "; + for (const auto & column : array_join->columns) + { + if (!first) + settings.out << ", "; + first = false; + + + settings.out << column; + } + settings.out << '\n'; +} + +} diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h new file mode 100644 index 00000000000..9a9504a5d54 --- /dev/null +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -0,0 +1,24 @@ +#pragma once +#include + +namespace DB +{ + +class ArrayJoinAction; +using ArrayJoinActionPtr = std::shared_ptr; + +class ArrayJoinStep : public ITransformingStep +{ +public: + explicit ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPtr array_join_); + String getName() const override { return "Expression"; } + + void transformPipeline(QueryPipeline & pipeline) override; + + void describeActions(FormatSettings & settings) const override; + +private: + ArrayJoinActionPtr array_join; +}; + +} diff --git a/src/Processors/Transforms/ArrayJoinTransform.cpp b/src/Processors/Transforms/ArrayJoinTransform.cpp new file mode 100644 index 00000000000..ba8e4949f7c --- /dev/null +++ b/src/Processors/Transforms/ArrayJoinTransform.cpp @@ -0,0 +1,37 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +Block ArrayJoinTransform::transformHeader(Block header, const ArrayJoinActionPtr & array_join) +{ + array_join->execute(header, true); + return header; +} + +ArrayJoinTransform::ArrayJoinTransform( + const Block & header_, + ArrayJoinActionPtr array_join_, + bool /*on_totals_*/) + : ISimpleTransform(header_, transformHeader(header_, array_join_), false) + , array_join(std::move(array_join_)) +{ + /// TODO +// if (on_totals_) +// throw Exception("ARRAY JOIN is not supported for totals", ErrorCodes::LOGICAL_ERROR); +} + +void ArrayJoinTransform::transform(Chunk & chunk) +{ + auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + array_join->execute(block, false); + chunk.setColumns(block.getColumns(), block.rows()); +} + +} diff --git a/src/Processors/Transforms/ArrayJoinTransform.h b/src/Processors/Transforms/ArrayJoinTransform.h new file mode 100644 index 00000000000..0d81d5e458c --- /dev/null +++ b/src/Processors/Transforms/ArrayJoinTransform.h @@ -0,0 +1,30 @@ +#pragma once +#include + +namespace DB +{ + +class ArrayJoinAction; +using ArrayJoinActionPtr = std::shared_ptr; + +/// Execute ARRAY JOIN +class ArrayJoinTransform : public ISimpleTransform +{ +public: + ArrayJoinTransform( + const Block & header_, + ArrayJoinActionPtr array_join_, + bool on_totals_ = false); + + String getName() const override { return "ArrayJoinTransform"; } + + static Block transformHeader(Block header, const ArrayJoinActionPtr & array_join); + +protected: + void transform(Chunk & chunk) override; + +private: + ArrayJoinActionPtr array_join; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 4c25ad5bf3f..45b9986a9bb 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -88,6 +88,7 @@ SRCS( QueryPipeline.cpp QueryPlan/AddingDelayedSourceStep.cpp QueryPlan/AggregatingStep.cpp + QueryPlan/ArrayJoinStep.cpp QueryPlan/ConvertingStep.cpp QueryPlan/CreatingSetsStep.cpp QueryPlan/CubeStep.cpp @@ -124,6 +125,7 @@ SRCS( Transforms/AddingSelectorTransform.cpp Transforms/AggregatingInOrderTransform.cpp Transforms/AggregatingTransform.cpp + Transforms/ArrayJoinTransform.cpp Transforms/ConvertingTransform.cpp Transforms/CopyTransform.cpp Transforms/CreatingSetsTransform.cpp From 85d0f1a480e3fef470882f4805c3d210772f2200 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 12 Aug 2020 11:55:16 +0300 Subject: [PATCH 2/8] Add ARRAY JOIN to ActionsChain --- src/Interpreters/ExpressionActions.cpp | 98 ++++++++++++++++++++---- src/Interpreters/ExpressionActions.h | 73 +++++++++++++++++- src/Interpreters/ExpressionAnalyzer.cpp | 21 +++-- src/Interpreters/ExpressionAnalyzer.h | 2 +- src/Processors/QueryPlan/ArrayJoinStep.h | 2 +- 5 files changed, 164 insertions(+), 32 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index f35e6266110..4b5ef7846b7 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1255,7 +1255,7 @@ void ExpressionActionsChain::addStep() if (steps.empty()) throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); - ColumnsWithTypeAndName columns = steps.back().actions->getSampleBlock().getColumnsWithTypeAndName(); + ColumnsWithTypeAndName columns = steps.back().getResultColumns(); steps.push_back(Step(std::make_shared(columns, context))); } @@ -1273,7 +1273,7 @@ void ExpressionActionsChain::finalize() if (i + 1 < static_cast(steps.size())) { const NameSet & additional_input = steps[i + 1].additional_input; - for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes()) + for (const auto & it : steps[i + 1].getRequiredColumns()) { if (additional_input.count(it.name) == 0) { @@ -1285,27 +1285,28 @@ void ExpressionActionsChain::finalize() } } } - steps[i].actions->finalize(required_output); + steps[i].finalize(required_output); } + /// TODO: move to QueryPlan /// When possible, move the ARRAY JOIN from earlier steps to later steps. - for (size_t i = 1; i < steps.size(); ++i) - { - ExpressionAction action; - if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action)) - steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock()); - } +// for (size_t i = 1; i < steps.size(); ++i) +// { +// ExpressionAction action; +// if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action)) +// steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock()); +// } /// Adding the ejection of unnecessary columns to the beginning of each step. for (size_t i = 1; i < steps.size(); ++i) { - size_t columns_from_previous = steps[i - 1].actions->getSampleBlock().columns(); + size_t columns_from_previous = steps[i - 1].getResultColumns().size(); /// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step. /// Except when we drop all the columns and lose the number of rows in the block. - if (!steps[i].actions->getRequiredColumnsWithTypes().empty() - && columns_from_previous > steps[i].actions->getRequiredColumnsWithTypes().size()) - steps[i].actions->prependProjectInput(); + if (!steps[i].getResultColumns().empty() + && columns_from_previous > steps[i].getRequiredColumns().size()) + steps[i].prependProjectInput(); } } @@ -1319,10 +1320,79 @@ std::string ExpressionActionsChain::dumpChain() const ss << "required output:\n"; for (const std::string & name : steps[i].required_output) ss << name << "\n"; - ss << "\n" << steps[i].actions->dumpActions() << "\n"; + ss << "\n" << steps[i].dump() << "\n"; } return ss.str(); } +ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_) + : kind(Kind::ARRAY_JOIN) + , array_join(std::move(array_join_)) + , columns_after_array_join(std::move(required_columns_)) +{ + for (auto & column : columns_after_array_join) + { + required_columns.emplace_back(NameAndTypePair(column.name, column.type)); + + if (array_join->columns.count(column.name) > 0) + { + const auto * array = typeid_cast(column.type.get()); + column.type = array->getNestedType(); + /// Arrays are materialized + column.column = nullptr; + } + } +} + +void ExpressionActionsChain::Step::finalize(const Names & required_output_) +{ + switch (kind) + { + case Kind::ACTIONS: + { + actions->finalize(required_output_); + return; + } + case Kind::ARRAY_JOIN: + { + NamesAndTypesList new_required_columns; + ColumnsWithTypeAndName new_result_columns; + + NameSet names(required_output_.begin(), required_output_.end()); + for (const auto & column : columns_after_array_join) + { + if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) + new_result_columns.emplace_back(column); + } + for (const auto & column : required_columns) + { + if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) + new_required_columns.emplace_back(column); + } + + std::swap(required_columns, new_required_columns); + std::swap(columns_after_array_join, new_result_columns); + return; + } + } +} + +void ExpressionActionsChain::Step::prependProjectInput() +{ + switch (kind) + { + case Kind::ACTIONS: + { + actions->prependProjectInput(); + return; + } + case Kind::ARRAY_JOIN: + { + /// TODO: remove unused columns before ARRAY JOIN ? + return; + } + } +} + } diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 372a17f58df..a5fd3b9d7a1 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -10,6 +10,7 @@ #include #include #include +#include #if !defined(ARCADIA_BUILD) # include "config_core.h" @@ -288,10 +289,22 @@ using ExpressionActionsPtr = std::shared_ptr; */ struct ExpressionActionsChain { - ExpressionActionsChain(const Context & context_) - : context(context_) {} + explicit ExpressionActionsChain(const Context & context_) : context(context_) {} + struct Step { + enum class Kind + { + ACTIONS, + ARRAY_JOIN, + }; + + Kind kind; + + ArrayJoinActionPtr array_join; + NamesAndTypesList required_columns; + ColumnsWithTypeAndName columns_after_array_join; + ExpressionActionsPtr actions; /// Columns were added to the block before current step in addition to prev step output. NameSet additional_input; @@ -302,8 +315,60 @@ struct ExpressionActionsChain /// If not empty, has the same size with required_output; is filled in finalize(). std::vector can_remove_required_output; - Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) - : actions(actions_), required_output(required_output_) {} + public: + explicit Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) + : kind(Kind::ACTIONS) + , actions(actions_) + , required_output(required_output_) + { + } + + explicit Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_); + + NamesAndTypesList getRequiredColumns() const + { + switch (kind) + { + case Kind::ACTIONS: + return actions->getRequiredColumnsWithTypes(); + case Kind::ARRAY_JOIN: + return required_columns; + } + + __builtin_unreachable(); + } + + ColumnsWithTypeAndName getResultColumns() const + { + switch (kind) + { + case Kind::ACTIONS: + return actions->getSampleBlock().getColumnsWithTypeAndName(); + case Kind::ARRAY_JOIN: + return columns_after_array_join; + } + + __builtin_unreachable(); + } + + void finalize(const Names & required_output_); + + void prependProjectInput(); + + std::string dump() const + { + switch (kind) + { + case Kind::ACTIONS: + { + return actions->dumpActions(); + } + case Kind::ARRAY_JOIN: + { + return "ARRAY JOIN"; + } + } + } }; using Steps = std::vector; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 7cabef1df9c..33f872ef15f 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -469,7 +469,7 @@ ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActi return std::make_shared(result_columns, array_join_is_left, context); } -ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types) +ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, ExpressionActionsPtr & before_array_join, bool only_types) { const auto * select_query = getSelectQuery(); @@ -482,9 +482,12 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi getRootActions(array_join_expression_list, only_types, step.actions); - auto array_join = addMultipleArrayJoinAction(step.actions, is_array_join_left); - for (const auto & column : array_join->columns) - step.required_output.emplace_back(column); + before_array_join = chain.getLastActions(); + auto array_join = addMultipleArrayJoinAction(step.actions, is_array_join_left); + + chain.steps.push_back(ExpressionActionsChain::Step(array_join, step.getResultColumns())); + + chain.addStep(); return array_join; } @@ -1091,13 +1094,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( chain.addStep(); } - array_join = query_analyzer.appendArrayJoin(chain, only_types || !first_stage); - if (array_join) - { - before_array_join = chain.getLastActions(true); - if (before_array_join) - chain.addStep(); - } + array_join = query_analyzer.appendArrayJoin(chain, before_array_join, only_types || !first_stage); if (query_analyzer.hasTableJoin()) { @@ -1123,7 +1120,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( { Block before_where_sample; if (chain.steps.size() > 1) - before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock(); + before_where_sample = Block(chain.steps[chain.steps.size() - 2].getResultColumns()); else before_where_sample = source_header; if (sanitizeBlock(before_where_sample)) diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 7ffe06ebd9e..5f2c1dc3a85 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -311,7 +311,7 @@ private: */ /// Before aggregation: - ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, bool only_types); + ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ExpressionActionsPtr & before_array_join, bool only_types); bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types); bool appendJoin(ExpressionActionsChain & chain); /// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection. diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h index 9a9504a5d54..d621d4b2ed1 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.h +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -11,7 +11,7 @@ class ArrayJoinStep : public ITransformingStep { public: explicit ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPtr array_join_); - String getName() const override { return "Expression"; } + String getName() const override { return "ArrayJoin"; } void transformPipeline(QueryPipeline & pipeline) override; From 6934974fc5fdf48f00da5225f51b1b860a3542d7 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 12 Aug 2020 16:30:02 +0300 Subject: [PATCH 3/8] Lift up ARRAY JOIN step. --- src/Interpreters/ExpressionActions.cpp | 134 +++++++++++++++++++- src/Interpreters/ExpressionActions.h | 10 +- src/Interpreters/InterpreterSystemQuery.cpp | 1 + src/Processors/QueryPlan/ArrayJoinStep.cpp | 21 +++ src/Processors/QueryPlan/ArrayJoinStep.h | 5 + src/Processors/QueryPlan/ExpressionStep.cpp | 21 +++ src/Processors/QueryPlan/ExpressionStep.h | 5 + src/Processors/QueryPlan/FilterStep.cpp | 21 +++ src/Processors/QueryPlan/FilterStep.h | 7 + src/Processors/QueryPlan/QueryPlan.cpp | 81 ++++++++++++ src/Processors/QueryPlan/QueryPlan.h | 5 +- 11 files changed, 304 insertions(+), 7 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 4b5ef7846b7..22d13a66616 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1077,6 +1077,138 @@ void ExpressionActions::optimizeArrayJoin() } } +ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) +{ + auto split_actions = std::make_shared(*this); + split_actions->actions.clear(); + split_actions->sample_block.clear(); + split_actions->input_columns.clear(); + + for (const auto & input_column : input_columns) + { + if (array_joined_columns.count(input_column.name) == 0) + { + split_actions->input_columns.emplace_back(input_column); + split_actions->sample_block.insert(ColumnWithTypeAndName(nullptr, input_column.type, input_column.name)); + } + } + + /// Do not split action if input depends only on array joined columns. + if (split_actions->input_columns.empty()) + return split_actions; + + NameSet array_join_dependent_columns = array_joined_columns; + /// Columns needed to evaluate arrayJoin or those that depend on it. + /// Actions to delete them can not be moved to the left of the arrayJoin. + NameSet array_join_dependencies; + + Actions new_actions; + for (const auto & action : actions) + { + if (action.type == ExpressionAction::PROJECT) + { + NamesWithAliases split_aliases; + NamesWithAliases depend_aliases; + for (const auto & pair : action.projection) + { + if (!pair.second.empty() || array_join_dependent_columns.count(pair.first)) + { + if (array_join_dependent_columns.count(pair.first)) + { + array_join_dependent_columns.insert(pair.second); + if (!pair.second.empty()) + depend_aliases.emplace_back(std::move(pair)); + } + else if (!pair.second.empty()) + split_aliases.emplace_back(std::move(pair)); + } + } + + if (!split_aliases.empty()) + split_actions->add(ExpressionAction::addAliases(split_aliases)); + + if (!depend_aliases.empty()) + new_actions.emplace_back(ExpressionAction::addAliases(depend_aliases)); + + continue; + } + + bool depends_on_array_join = false; + for (auto & column : action.getNeededColumns()) + if (array_join_dependent_columns.count(column) != 0) + depends_on_array_join = true; + + if (depends_on_array_join) + { + if (!action.result_name.empty()) + array_join_dependent_columns.insert(action.result_name); + if (action.array_join) + array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end()); + + auto needed = action.getNeededColumns(); + array_join_dependencies.insert(needed.begin(), needed.end()); + new_actions.emplace_back(action); + } + else + { + /// Replace PROJECT to ADD_ALIASES, because project may remove columns needed for array join +// if (action.type == ExpressionAction::PROJECT) +// { +// NamesWithAliases projection; +// +// for (auto & column : action.projection) +// { +// if (!column.second.empty()) +// { +// projection.emplace_back(column); +// column.second.clear(); +// } +// } +// +// /// new_actions.emplace_back(action); +// +// if (!projection.empty()) +// { +// action.type = ExpressionAction::ADD_ALIASES; +// action.projection.swap(projection); +// split_actions->add(std::move(action)); +// } +// } +// else + + if (action.type == ExpressionAction::REMOVE_COLUMN) + { + if (array_join_dependencies.count(action.source_name)) + new_actions.emplace_back(action); + else + split_actions->add(action); + + continue; + } + + split_actions->add(action); + } + } + + if (split_actions->getActions().empty()) + return split_actions; + + std::swap(actions, new_actions); + + /// Add input from split actions result. + NamesAndTypesList inputs_from_array_join; + for (auto & column : input_columns) + if (array_joined_columns.count(column.name)) + inputs_from_array_join.emplace_back(std::move(column)); + + input_columns = split_actions->getSampleBlock().getNamesAndTypesList(); + input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end()); + + if (!actions.empty()) + prependProjectInput(); + + return split_actions; +} JoinPtr ExpressionActions::getTableJoinAlgo() const { @@ -1378,7 +1510,7 @@ void ExpressionActionsChain::Step::finalize(const Names & required_output_) } } -void ExpressionActionsChain::Step::prependProjectInput() +void ExpressionActionsChain::Step::prependProjectInput() const { switch (kind) { diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index a5fd3b9d7a1..428a001db05 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -144,6 +144,8 @@ private: void execute(Block & block, bool dry_run) const; }; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; /** Contains a sequence of actions on the block. */ @@ -183,6 +185,8 @@ public: /// Change the corresponding output types to arrays. bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action); + ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns); + /// - Adds actions to delete all but the specified columns. /// - Removes unused input columns. /// - Can somehow optimize the expression. @@ -275,8 +279,6 @@ private: void optimizeArrayJoin(); }; -using ExpressionActionsPtr = std::shared_ptr; - /** The sequence of transformations over the block. * It is assumed that the result of each step is fed to the input of the next step. @@ -353,7 +355,7 @@ struct ExpressionActionsChain void finalize(const Names & required_output_); - void prependProjectInput(); + void prependProjectInput() const; std::string dump() const { @@ -368,6 +370,8 @@ struct ExpressionActionsChain return "ARRAY JOIN"; } } + + __builtin_unreachable(); } }; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 4bfa84090c2..96cd5f5bea5 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include diff --git a/src/Processors/QueryPlan/ArrayJoinStep.cpp b/src/Processors/QueryPlan/ArrayJoinStep.cpp index 2948d4cc842..402f7d4a318 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.cpp +++ b/src/Processors/QueryPlan/ArrayJoinStep.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -32,6 +33,18 @@ ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPt { } +void ArrayJoinStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + ArrayJoinTransform::transformHeader(input_stream.header, array_join), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -39,6 +52,14 @@ void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, array_join, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } void ArrayJoinStep::describeActions(FormatSettings & settings) const diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h index d621d4b2ed1..92c7e0a1304 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.h +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -17,8 +17,13 @@ public: void describeActions(FormatSettings & settings) const override; + void updateInputStream(DataStream input_stream, Block result_header); + + const ArrayJoinActionPtr & arrayJoin() const { return array_join; } + private: ArrayJoinActionPtr array_join; + Block res_header; }; } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 3b0632eff1b..23b7b04af26 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,18 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } +void ExpressionStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + res_header ? res_header : Transform::transformHeader(input_stream.header, expression), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void ExpressionStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -42,6 +55,14 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, expression, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } static void doDescribeActions(const ExpressionActionsPtr & expression, IQueryPlanStep::FormatSettings & settings) diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 80d2fd2630d..40db2dab817 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -21,10 +21,15 @@ public: void transformPipeline(QueryPipeline & pipeline) override; + void updateInputStream(DataStream input_stream, Block result_header); + void describeActions(FormatSettings & settings) const override; + const ExpressionActionsPtr & getExpression() const { return expression; } + private: ExpressionActionsPtr expression; + Block res_header; }; /// TODO: add separate step for join. diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 504fd71c56a..9d7ba499979 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,18 @@ FilterStep::FilterStep( updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } +void FilterStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + res_header ? res_header : FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void FilterStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -47,6 +60,14 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, expression, filter_column_name, remove_filter_column, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } void FilterStep::describeActions(FormatSettings & settings) const diff --git a/src/Processors/QueryPlan/FilterStep.h b/src/Processors/QueryPlan/FilterStep.h index 1decc61349a..163ef5c128e 100644 --- a/src/Processors/QueryPlan/FilterStep.h +++ b/src/Processors/QueryPlan/FilterStep.h @@ -20,12 +20,19 @@ public: String getName() const override { return "Filter"; } void transformPipeline(QueryPipeline & pipeline) override; + void updateInputStream(DataStream input_stream, Block result_header); + void describeActions(FormatSettings & settings) const override; + const ExpressionActionsPtr & getExpression() const { return expression; } + const String & getFilterColumnName() const { return filter_column_name; } + bool removesFilterColumn() const { return remove_filter_column; } + private: ExpressionActionsPtr expression; String filter_column_name; bool remove_filter_column; + Block res_header; }; } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index ae0147d8724..98949309eda 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include "MergingSortedStep.h" @@ -10,6 +12,9 @@ #include "MergeSortingStep.h" #include "PartialSortingStep.h" #include "TotalsHavingStep.h" +#include "ExpressionStep.h" +#include "ArrayJoinStep.h" +#include "FilterStep.h" namespace DB { @@ -408,6 +413,76 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_ parent.swap(child); } +static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes) +{ + auto & parent = parent_node->step; + auto & child = child_node->step; + auto * expression_step = typeid_cast(parent.get()); + auto * filter_step = typeid_cast(parent.get()); + auto * array_join_step = typeid_cast(child.get()); + + if (!(expression_step || filter_step) || !array_join_step) + return; + + const auto & array_join = array_join_step->arrayJoin(); + const auto & expression = expression_step ? expression_step->getExpression() + : filter_step->getExpression(); + + auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns); + + /// No actions can be moved before ARRAY JOIN. + if (split_actions->getActions().empty()) + return; + + auto expected_header = parent->getOutputStream().header; + + /// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin. + if (expression->getActions().empty()) + { + /// Expression -> ArrayJoin + std::swap(parent, child); + /// ArrayJoin -> Expression + if (expression_step) + child = std::make_unique(child_node->children.at(0)->step->getOutputStream(), + std::move(split_actions)); + else + child = std::make_unique(child_node->children.at(0)->step->getOutputStream(), + std::move(split_actions), + filter_step->getFilterColumnName(), + filter_step->removesFilterColumn()); + + array_join_step->updateInputStream(child->getOutputStream(), expected_header); + return; + } + + /// Add new expression step before ARRAY JOIN. + /// Expression/Filter -> ArrayJoin -> Something + auto & node = nodes.emplace_back(); + node.children.swap(child_node->children); + child_node->children.emplace_back(&node); + /// Expression/Filter -> ArrayJoin -> node -> Something +// if (filter_step && split_actions->getSampleBlock().has(filter_step->getFilterColumnName())) +// { +// /// Filter -> ArrayJoin -> node -> Something +// node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), +// std::move(split_actions), +// filter_step->getFilterColumnName(), +// filter_step->removesFilterColumn()); +// +// array_join_step->updateInputStream(node.step->getOutputStream()); +// +// parent = std::make_unique(array_join_step->getOutputStream(), +// filter_step->getExpression()); +// /// Expression -> ArrayJoin -> Filter -> Something +// } + + node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), + std::move(split_actions)); + array_join_step->updateInputStream(node.step->getOutputStream(), {}); + expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), expected_header) + : filter_step->updateInputStream(array_join_step->getOutputStream(), expected_header); +} + void QueryPlan::optimize() { struct Frame @@ -436,7 +511,13 @@ void QueryPlan::optimize() ++frame.next_child; } else + { + /// Last entrance, try lift up. + if (frame.node->children.size() == 1) + tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes); + stack.pop(); + } } } diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 17ff7dde3af..7ce8d9426c4 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -69,11 +69,10 @@ public: std::vector children = {}; }; -private: - using Nodes = std::list; - Nodes nodes; +private: + Nodes nodes; Node * root = nullptr; void checkInitialized() const; From 246c2cafb747e4581270b71cadbb0f5fd4da9bf4 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 13 Aug 2020 21:40:21 +0300 Subject: [PATCH 4/8] Refactor splitActionsBeforeArrayJoin. --- src/Interpreters/ExpressionActions.cpp | 92 ++++++++++++-------------- src/Interpreters/ExpressionActions.h | 1 + 2 files changed, 44 insertions(+), 49 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 22d13a66616..713efd5797e 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1079,11 +1079,22 @@ void ExpressionActions::optimizeArrayJoin() ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) { + /// Create new actions. + /// Copy from this because we don't have context. + /// TODO: remove context from constructor? auto split_actions = std::make_shared(*this); split_actions->actions.clear(); split_actions->sample_block.clear(); split_actions->input_columns.clear(); + /// Expected chain: + /// Expression (this) -> ArrayJoin (array_joined_columns) -> Expression (split_actions) + + /// We are going to move as many actions as we can from this to split_actions. + /// We can move all inputs which are not depend on array_joined_columns + /// (with some exceptions to PROJECT and REMOVE_COLUMN + + /// Use the same inputs for split_actions, except array_joined_columns. for (const auto & input_column : input_columns) { if (array_joined_columns.count(input_column.name) == 0) @@ -1097,31 +1108,39 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe if (split_actions->input_columns.empty()) return split_actions; + /// Actions which depend on ARRAY JOIN result. NameSet array_join_dependent_columns = array_joined_columns; - /// Columns needed to evaluate arrayJoin or those that depend on it. - /// Actions to delete them can not be moved to the left of the arrayJoin. - NameSet array_join_dependencies; + /// Arguments of actions which depend on ARRAY JOIN result. + /// This columns can't be deleted in split_actions. + NameSet array_join_dependent_columns_arguments; + /// We create new_actions list for `this`. Current actions are moved to new_actions nor added to split_actions. Actions new_actions; for (const auto & action : actions) { + /// Exception for PROJECT. + /// It removes columns, so it will remove split_actions output which may be needed for actions from `this`. + /// So, we replace it ADD_ALIASES. + /// Usually, PROJECT is added to begin of actions in order to remove unused output of prev actions. + /// We skip it now, but will prependProjectInput at the end. if (action.type == ExpressionAction::PROJECT) { + /// Each alias has separate dependencies, so we split this action into two parts. NamesWithAliases split_aliases; NamesWithAliases depend_aliases; for (const auto & pair : action.projection) { - if (!pair.second.empty() || array_join_dependent_columns.count(pair.first)) + /// Skip if is not alias. + if (pair.second.empty()) + continue; + + if (array_join_dependent_columns.count(pair.first)) { - if (array_join_dependent_columns.count(pair.first)) - { - array_join_dependent_columns.insert(pair.second); - if (!pair.second.empty()) - depend_aliases.emplace_back(std::move(pair)); - } - else if (!pair.second.empty()) - split_aliases.emplace_back(std::move(pair)); + array_join_dependent_columns.insert(pair.second); + depend_aliases.emplace_back(std::move(pair)); } + else + split_aliases.emplace_back(std::move(pair)); } if (!split_aliases.empty()) @@ -1140,45 +1159,25 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe if (depends_on_array_join) { + /// Add result of this action to array_join_dependent_columns too. if (!action.result_name.empty()) array_join_dependent_columns.insert(action.result_name); if (action.array_join) array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end()); + /// Add arguments of this action to array_join_dependent_columns_arguments. auto needed = action.getNeededColumns(); - array_join_dependencies.insert(needed.begin(), needed.end()); + array_join_dependent_columns_arguments.insert(needed.begin(), needed.end()); + new_actions.emplace_back(action); } else { - /// Replace PROJECT to ADD_ALIASES, because project may remove columns needed for array join -// if (action.type == ExpressionAction::PROJECT) -// { -// NamesWithAliases projection; -// -// for (auto & column : action.projection) -// { -// if (!column.second.empty()) -// { -// projection.emplace_back(column); -// column.second.clear(); -// } -// } -// -// /// new_actions.emplace_back(action); -// -// if (!projection.empty()) -// { -// action.type = ExpressionAction::ADD_ALIASES; -// action.projection.swap(projection); -// split_actions->add(std::move(action)); -// } -// } -// else - + /// Exception for REMOVE_COLUMN. + /// We cannot move it to split_actions if any argument from `this` needed that column. if (action.type == ExpressionAction::REMOVE_COLUMN) { - if (array_join_dependencies.count(action.source_name)) + if (array_join_dependent_columns_arguments.count(action.source_name)) new_actions.emplace_back(action); else split_actions->add(action); @@ -1190,20 +1189,24 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe } } + /// Return empty actions if nothing was separated. Keep `this` unchanged. if (split_actions->getActions().empty()) return split_actions; std::swap(actions, new_actions); - /// Add input from split actions result. + /// Collect inputs from ARRAY JOIN. NamesAndTypesList inputs_from_array_join; for (auto & column : input_columns) if (array_joined_columns.count(column.name)) inputs_from_array_join.emplace_back(std::move(column)); + /// Fix inputs for `this`. + /// It is output of split_actions + inputs from ARRAY JOIN. input_columns = split_actions->getSampleBlock().getNamesAndTypesList(); input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end()); + /// Remove not needed columns. if (!actions.empty()) prependProjectInput(); @@ -1420,15 +1423,6 @@ void ExpressionActionsChain::finalize() steps[i].finalize(required_output); } - /// TODO: move to QueryPlan - /// When possible, move the ARRAY JOIN from earlier steps to later steps. -// for (size_t i = 1; i < steps.size(); ++i) -// { -// ExpressionAction action; -// if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action)) -// steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock()); -// } - /// Adding the ejection of unnecessary columns to the beginning of each step. for (size_t i = 1; i < steps.size(); ++i) { diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 428a001db05..f2741c7f68b 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -185,6 +185,7 @@ public: /// Change the corresponding output types to arrays. bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action); + /// Splits actions into two parts. Returned half may be swapped with ARRAY JOIN. ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns); /// - Adds actions to delete all but the specified columns. From 8e631a98eab55a31b6f40b701b3a51c39d91df1c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 13 Aug 2020 23:17:18 +0300 Subject: [PATCH 5/8] Refactor ActionsChain. --- src/Interpreters/ActionsVisitor.cpp | 17 +- src/Interpreters/ArrayJoinAction.cpp | 31 +-- src/Interpreters/ArrayJoinAction.h | 3 +- src/Interpreters/ExpressionActions.cpp | 244 +++++++++--------- src/Interpreters/ExpressionActions.h | 125 ++++----- src/Interpreters/ExpressionAnalyzer.cpp | 68 ++--- src/Interpreters/InterpreterSystemQuery.cpp | 1 - src/Interpreters/MutationsInterpreter.cpp | 6 +- src/Processors/QueryPlan/ExpressionStep.cpp | 12 +- src/Processors/QueryPlan/ExpressionStep.h | 3 +- src/Processors/QueryPlan/FilterStep.cpp | 13 +- src/Processors/QueryPlan/FilterStep.h | 3 +- src/Processors/QueryPlan/QueryPlan.cpp | 30 +-- .../Transforms/ArrayJoinTransform.cpp | 4 +- 14 files changed, 235 insertions(+), 325 deletions(-) diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 1c82bc62f24..f2a1d570773 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -1,6 +1,5 @@ -#include "Common/quoteString.h" +#include #include -#include #include #include @@ -9,7 +8,6 @@ #include #include -#include #include #include #include @@ -21,7 +19,6 @@ #include #include -#include #include @@ -546,10 +543,14 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & if (!data.only_consts) { String result_name = column_name.get(ast); - data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), result_name)); - NameSet joined_columns; - joined_columns.insert(result_name); - data.addAction(ExpressionAction::arrayJoin(std::make_shared(joined_columns, false, data.context))); + /// Here we copy argument because arrayJoin removes source column. + /// It makes possible to remove source column before arrayJoin if it won't be needed anymore. + + /// It could have been possible to implement arrayJoin which keeps source column, + /// but in this case it will always be replicated (as many arrays), which is expensive. + String tmp_name = data.getUniqueName("_array_join_" + arg->getColumnName()); + data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), tmp_name)); + data.addAction(ExpressionAction::arrayJoin(tmp_name, result_name)); } return; diff --git a/src/Interpreters/ArrayJoinAction.cpp b/src/Interpreters/ArrayJoinAction.cpp index 8a0edce7800..176dc8258ce 100644 --- a/src/Interpreters/ArrayJoinAction.cpp +++ b/src/Interpreters/ArrayJoinAction.cpp @@ -48,7 +48,7 @@ void ArrayJoinAction::prepare(Block & sample_block) } } -void ArrayJoinAction::execute(Block & block, bool dry_run) +void ArrayJoinAction::execute(Block & block) { if (columns.empty()) throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR); @@ -105,7 +105,7 @@ void ArrayJoinAction::execute(Block & block, bool dry_run) Block tmp_block{src_col, {{}, src_col.type, {}}}; - function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run); + function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size()); non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column; } @@ -140,31 +140,4 @@ void ArrayJoinAction::execute(Block & block, bool dry_run) } } -void ArrayJoinAction::finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns) -{ - /// Do not ARRAY JOIN columns that are not used anymore. - /// Usually, such columns are not used until ARRAY JOIN, and therefore are ejected further in this function. - /// We will not remove all the columns so as not to lose the number of rows. - for (auto it = columns.begin(); it != columns.end();) - { - bool need = needed_columns.count(*it); - if (!need && columns.size() > 1) - { - columns.erase(it++); - } - else - { - needed_columns.insert(*it); - unmodified_columns.erase(*it); - - /// If no ARRAY JOIN results are used, forcibly leave an arbitrary column at the output, - /// so you do not lose the number of rows. - if (!need) - final_columns.insert(*it); - - ++it; - } - } -} - } diff --git a/src/Interpreters/ArrayJoinAction.h b/src/Interpreters/ArrayJoinAction.h index be5be738bb9..9467e579e62 100644 --- a/src/Interpreters/ArrayJoinAction.h +++ b/src/Interpreters/ArrayJoinAction.h @@ -29,8 +29,7 @@ public: ArrayJoinAction(const NameSet & array_joined_columns_, bool array_join_is_left, const Context & context); void prepare(Block & sample_block); - void execute(Block & block, bool dry_run); - void finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns); + void execute(Block & block); }; using ArrayJoinActionPtr = std::shared_ptr; diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 713efd5797e..c04ebb62c4c 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1,19 +1,19 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include -#include #include #include #include -#include #if !defined(ARCADIA_BUILD) # include "config_core.h" @@ -37,6 +37,7 @@ namespace ErrorCodes extern const int NOT_FOUND_COLUMN_IN_BLOCK; extern const int TOO_MANY_TEMPORARY_COLUMNS; extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS; + extern const int TYPE_MISMATCH; } /// Read comment near usage @@ -47,9 +48,6 @@ Names ExpressionAction::getNeededColumns() const { Names res = argument_names; - if (array_join) - res.insert(res.end(), array_join->columns.begin(), array_join->columns.end()); - if (table_join) res.insert(res.end(), table_join->keyNamesLeft().begin(), table_join->keyNamesLeft().end()); @@ -143,11 +141,15 @@ ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_c return a; } -ExpressionAction ExpressionAction::arrayJoin(ArrayJoinActionPtr array_join_) +ExpressionAction ExpressionAction::arrayJoin(std::string source_name, std::string result_name) { + if (source_name == result_name) + throw Exception("ARRAY JOIN action should have different source and result names", ErrorCodes::LOGICAL_ERROR); + ExpressionAction a; a.type = ARRAY_JOIN; - a.array_join = std::move(array_join_); + a.source_name = std::move(source_name); + a.result_name = std::move(result_name); return a; } @@ -243,7 +245,18 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings, case ARRAY_JOIN: { - array_join->prepare(sample_block); + ColumnWithTypeAndName current = sample_block.getByName(source_name); + sample_block.erase(source_name); + + const DataTypeArray * array_type = typeid_cast(&*current.type); + if (!array_type) + throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH); + + current.name = result_name; + current.type = array_type->getNestedType(); + current.column = nullptr; /// Result is never const + sample_block.insert(std::move(current)); + break; } @@ -369,7 +382,23 @@ void ExpressionAction::execute(Block & block, bool dry_run) const case ARRAY_JOIN: { - array_join->execute(block, dry_run); + auto source = block.getByName(source_name); + block.erase(source_name); + source.column = source.column->convertToFullColumnIfConst(); + + const ColumnArray * array = typeid_cast(source.column.get()); + if (!array) + throw Exception("ARRAY JOIN of not array: " + source_name, ErrorCodes::TYPE_MISMATCH); + + for (auto & column : block) + column.column = column.column->replicate(array->getOffsets()); + + source.column = array->getDataPtr(); + source.type = assert_cast(*source.type).getNestedType(); + source.name = result_name; + + block.insert(std::move(source)); + break; } @@ -478,13 +507,7 @@ std::string ExpressionAction::toString() const break; case ARRAY_JOIN: - ss << (array_join->is_left ? "LEFT " : "") << "ARRAY JOIN "; - for (NameSet::const_iterator it = array_join->columns.begin(); it != array_join->columns.end(); ++it) - { - if (it != array_join->columns.begin()) - ss << ", "; - ss << *it; - } + ss << "ARRAY JOIN " << source_name << " -> " << result_name; break; case JOIN: @@ -597,9 +620,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names) if (!action.result_name.empty()) new_names.push_back(action.result_name); - if (action.array_join) - new_names.insert(new_names.end(), action.array_join->columns.begin(), action.array_join->columns.end()); - /// Compiled functions are custom functions and they don't need building if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled) { @@ -631,51 +651,6 @@ void ExpressionActions::prependProjectInput() actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns())); } -void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before) -{ - if (action.type != ExpressionAction::ARRAY_JOIN) - throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR); - - NameSet array_join_set(action.array_join->columns.begin(), action.array_join->columns.end()); - for (auto & it : input_columns) - { - if (array_join_set.count(it.name)) - { - array_join_set.erase(it.name); - it.type = std::make_shared(it.type); - } - } - for (const std::string & name : array_join_set) - { - input_columns.emplace_back(name, sample_block_before.getByName(name).type); - actions.insert(actions.begin(), ExpressionAction::removeColumn(name)); - } - - actions.insert(actions.begin(), action); - optimizeArrayJoin(); -} - - -bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action) -{ - if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN) - return false; - NameSet required_set(required_columns.begin(), required_columns.end()); - for (const std::string & name : actions.back().array_join->columns) - { - if (required_set.count(name)) - return false; - } - for (const std::string & name : actions.back().array_join->columns) - { - DataTypePtr & type = sample_block.getByName(name).type; - type = std::make_shared(type); - } - out_action = actions.back(); - actions.pop_back(); - return true; -} - void ExpressionActions::execute(Block & block, bool dry_run) const { for (const auto & action : actions) @@ -809,7 +784,18 @@ void ExpressionActions::finalize(const Names & output_columns) } else if (action.type == ExpressionAction::ARRAY_JOIN) { - action.array_join->finalize(needed_columns, unmodified_columns, final_columns); + /// We need source anyway, in order to calculate number of rows correctly. + needed_columns.insert(action.source_name); + unmodified_columns.erase(action.result_name); + needed_columns.erase(action.result_name); + + /// Note: technically, if result of arrayJoin is not needed, + /// we may remove all the columns and loose the number of rows here. + /// However, I cannot imagine how it is possible. + /// For "big" ARRAY JOIN it could have happened in query like + /// SELECT count() FROM table ARRAY JOIN x + /// Now, "big" ARRAY JOIN is moved to separate pipeline step, + /// and arrayJoin(x) is an expression which result can't be lost. } else { @@ -946,7 +932,7 @@ void ExpressionActions::finalize(const Names & output_columns) auto process = [&] (const String & name) { auto refcount = --columns_refcount[name]; - if (refcount <= 0) + if (refcount <= 0 && action.type != ExpressionAction::ARRAY_JOIN) { new_actions.push_back(ExpressionAction::removeColumn(name)); if (sample_block.has(name)) @@ -1046,8 +1032,6 @@ void ExpressionActions::optimizeArrayJoin() if (!actions[i].result_name.empty()) array_joined_columns.insert(actions[i].result_name); - if (actions[i].array_join) - array_joined_columns.insert(actions[i].array_join->columns.begin(), actions[i].array_join->columns.end()); array_join_dependencies.insert(needed.begin(), needed.end()); } @@ -1106,7 +1090,7 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe /// Do not split action if input depends only on array joined columns. if (split_actions->input_columns.empty()) - return split_actions; + return nullptr; /// Actions which depend on ARRAY JOIN result. NameSet array_join_dependent_columns = array_joined_columns; @@ -1162,8 +1146,6 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe /// Add result of this action to array_join_dependent_columns too. if (!action.result_name.empty()) array_join_dependent_columns.insert(action.result_name); - if (action.array_join) - array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end()); /// Add arguments of this action to array_join_dependent_columns_arguments. auto needed = action.getNeededColumns(); @@ -1191,7 +1173,7 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe /// Return empty actions if nothing was separated. Keep `this` unchanged. if (split_actions->getActions().empty()) - return split_actions; + return nullptr; std::swap(actions, new_actions); @@ -1313,9 +1295,8 @@ UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action hash.update(arg_name); break; case ARRAY_JOIN: - hash.update(action.array_join->is_left); - for (const auto & col : action.array_join->columns) - hash.update(col); + hash.update(action.result_name); + hash.update(action.source_name); break; case JOIN: for (const auto & col : action.table_join->columnsAddedByJoin()) @@ -1371,15 +1352,9 @@ bool ExpressionAction::operator==(const ExpressionAction & other) const return false; } - bool same_array_join = !array_join && !other.array_join; - if (array_join && other.array_join) - same_array_join = (array_join->columns == other.array_join->columns) && - (array_join->is_left == other.array_join->is_left); - return source_name == other.source_name && result_name == other.result_name && argument_names == other.argument_names - && same_array_join && TableJoin::sameJoin(table_join.get(), other.table_join.get()) && projection == other.projection && is_function_compiled == other.is_function_compiled; @@ -1452,12 +1427,11 @@ std::string ExpressionActionsChain::dumpChain() const return ss.str(); } -ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_) - : kind(Kind::ARRAY_JOIN) - , array_join(std::move(array_join_)) - , columns_after_array_join(std::move(required_columns_)) +ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_) + : array_join(std::move(array_join_)) + , result_columns(std::move(required_columns_)) { - for (auto & column : columns_after_array_join) + for (auto & column : result_columns) { required_columns.emplace_back(NameAndTypePair(column.name, column.type)); @@ -1471,54 +1445,68 @@ ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join_, ColumnsWithTy } } +void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_output_) +{ + NamesAndTypesList new_required_columns; + ColumnsWithTypeAndName new_result_columns; + + NameSet names(required_output_.begin(), required_output_.end()); + for (const auto & column : result_columns) + { + if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) + new_result_columns.emplace_back(column); + } + for (const auto & column : required_columns) + { + if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) + new_required_columns.emplace_back(column); + } + + std::swap(required_columns, new_required_columns); + std::swap(result_columns, new_result_columns); +} + +ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns) + : link(ArrayJoinLink(std::move(array_join), std::move(required_columns))) +{ +} + +template +static Res dispatch(Ptr * ptr, Callback && callback) +{ + if (std::holds_alternative(ptr->link)) + return callback(std::get(ptr->link)); + if (std::holds_alternative(ptr->link)) + return callback(std::get(ptr->link)); + + throw Exception("Unknown variant in ExpressionActionsChain step", ErrorCodes::LOGICAL_ERROR); +} + +const NamesAndTypesList & ExpressionActionsChain::Step::getRequiredColumns() const +{ + using Res = const NamesAndTypesList &; + return dispatch(this, [](auto & x) -> Res { return x.getRequiredColumns(); }); +} + +const ColumnsWithTypeAndName & ExpressionActionsChain::Step::getResultColumns() const +{ + using Res = const ColumnsWithTypeAndName &; + return dispatch(this, [](auto & x) -> Res{ return x.getResultColumns(); }); +} + void ExpressionActionsChain::Step::finalize(const Names & required_output_) { - switch (kind) - { - case Kind::ACTIONS: - { - actions->finalize(required_output_); - return; - } - case Kind::ARRAY_JOIN: - { - NamesAndTypesList new_required_columns; - ColumnsWithTypeAndName new_result_columns; - - NameSet names(required_output_.begin(), required_output_.end()); - for (const auto & column : columns_after_array_join) - { - if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) - new_result_columns.emplace_back(column); - } - for (const auto & column : required_columns) - { - if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0) - new_required_columns.emplace_back(column); - } - - std::swap(required_columns, new_required_columns); - std::swap(columns_after_array_join, new_result_columns); - return; - } - } + dispatch(this, [&required_output_](auto & x) { x.finalize(required_output_); }); } void ExpressionActionsChain::Step::prependProjectInput() const { - switch (kind) - { - case Kind::ACTIONS: - { - actions->prependProjectInput(); - return; - } - case Kind::ARRAY_JOIN: - { - /// TODO: remove unused columns before ARRAY JOIN ? - return; - } - } + dispatch(this, [](auto & x) { x.prependProjectInput(); }); +} + +std::string ExpressionActionsChain::Step::dump() const +{ + return dispatch(this, [](auto & x) { return x.dump(); }); } } diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index f2741c7f68b..a11295a317c 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -9,9 +9,10 @@ #include #include #include -#include #include +#include + #if !defined(ARCADIA_BUILD) # include "config_core.h" #endif @@ -45,6 +46,9 @@ using DataTypePtr = std::shared_ptr; class ExpressionActions; class CompiledExpressionCache; +class ArrayJoinAction; +using ArrayJoinActionPtr = std::shared_ptr; + /** Action on the block. */ struct ExpressionAction @@ -60,10 +64,9 @@ public: APPLY_FUNCTION, - /** Replaces the specified columns with arrays into columns with elements. - * Duplicates the values in the remaining columns by the number of elements in the arrays. - * Arrays must be parallel (have the same lengths). - */ + /// Replaces the source column with array into column with elements. + /// Duplicates the values in the remaining columns by the number of elements in the arrays. + /// Source column is removed from block. ARRAY_JOIN, JOIN, @@ -76,7 +79,7 @@ public: Type type{}; - /// For ADD/REMOVE/COPY_COLUMN. + /// For ADD/REMOVE/ARRAY_JOIN/COPY_COLUMN. std::string source_name; std::string result_name; DataTypePtr result_type; @@ -98,9 +101,6 @@ public: Names argument_names; bool is_function_compiled = false; - /// For ARRAY JOIN - ArrayJoinActionPtr array_join; - /// For JOIN std::shared_ptr table_join; JoinPtr join; @@ -118,7 +118,7 @@ public: static ExpressionAction project(const NamesWithAliases & projected_columns_); static ExpressionAction project(const Names & projected_columns_); static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_); - static ExpressionAction arrayJoin(ArrayJoinActionPtr array_join_); + static ExpressionAction arrayJoin(std::string source_name, std::string result_name); static ExpressionAction ordinaryJoin(std::shared_ptr table_join, JoinPtr join); /// Which columns necessary to perform this action. @@ -177,15 +177,8 @@ public: /// Adds to the beginning the removal of all extra columns. void prependProjectInput(); - /// Add the specified ARRAY JOIN action to the beginning. Change the appropriate input types to arrays. - /// If there are unknown columns in the ARRAY JOIN list, take their types from sample_block, and immediately after ARRAY JOIN remove them. - void prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before); - - /// If the last action is ARRAY JOIN, and it does not affect the columns from required_columns, discard and return it. - /// Change the corresponding output types to arrays. - bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action); - /// Splits actions into two parts. Returned half may be swapped with ARRAY JOIN. + /// Returns nullptr if no actions may be moved before ARRAY JOIN. ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns); /// - Adds actions to delete all but the specified columns. @@ -202,8 +195,8 @@ public: Names getRequiredColumns() const { Names names; - for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it) - names.push_back(it->name); + for (const auto & input : input_columns) + names.push_back(input.name); return names; } @@ -294,21 +287,36 @@ struct ExpressionActionsChain { explicit ExpressionActionsChain(const Context & context_) : context(context_) {} - struct Step + struct ExpressionActionsLink { - enum class Kind - { - ACTIONS, - ARRAY_JOIN, - }; + ExpressionActionsPtr actions; - Kind kind; + const NamesAndTypesList & getRequiredColumns() const { return actions->getRequiredColumnsWithTypes(); } + const ColumnsWithTypeAndName & getResultColumns() const { return actions->getSampleBlock().getColumnsWithTypeAndName(); } + void finalize(const Names & required_output_) const { actions->finalize(required_output_); } + void prependProjectInput() const { actions->prependProjectInput(); } + std::string dump() const { return actions->dumpActions(); } + }; + struct ArrayJoinLink + { ArrayJoinActionPtr array_join; NamesAndTypesList required_columns; - ColumnsWithTypeAndName columns_after_array_join; + ColumnsWithTypeAndName result_columns; + + ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_); + + const NamesAndTypesList & getRequiredColumns() const { return required_columns; } + const ColumnsWithTypeAndName & getResultColumns() const { return result_columns; } + void finalize(const Names & required_output_); + void prependProjectInput() const {} /// TODO: remove unused columns before ARRAY JOIN ? + static std::string dump() { return "ARRAY JOIN"; } + }; + + struct Step + { + std::variant link; - ExpressionActionsPtr actions; /// Columns were added to the block before current step in addition to prev step output. NameSet additional_input; /// Columns which are required in the result of current step. @@ -319,61 +327,24 @@ struct ExpressionActionsChain std::vector can_remove_required_output; public: - explicit Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) - : kind(Kind::ACTIONS) - , actions(actions_) + explicit Step(ExpressionActionsPtr actions, const Names & required_output_ = Names()) + : link(ExpressionActionsLink{std::move(actions)}) , required_output(required_output_) { } - explicit Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_); - - NamesAndTypesList getRequiredColumns() const - { - switch (kind) - { - case Kind::ACTIONS: - return actions->getRequiredColumnsWithTypes(); - case Kind::ARRAY_JOIN: - return required_columns; - } - - __builtin_unreachable(); - } - - ColumnsWithTypeAndName getResultColumns() const - { - switch (kind) - { - case Kind::ACTIONS: - return actions->getSampleBlock().getColumnsWithTypeAndName(); - case Kind::ARRAY_JOIN: - return columns_after_array_join; - } - - __builtin_unreachable(); - } + explicit Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns); + const NamesAndTypesList & getRequiredColumns() const; + const ColumnsWithTypeAndName & getResultColumns() const; + /// Remove unused result and update required columns void finalize(const Names & required_output_); - + /// Add projections to expression void prependProjectInput() const; + std::string dump() const; - std::string dump() const - { - switch (kind) - { - case Kind::ACTIONS: - { - return actions->dumpActions(); - } - case Kind::ARRAY_JOIN: - { - return "ARRAY JOIN"; - } - } - - __builtin_unreachable(); - } + ExpressionActionsPtr & actions() { return std::get(link).actions; } + const ExpressionActionsPtr & actions() const { return std::get(link).actions; } }; using Steps = std::vector; @@ -399,7 +370,7 @@ struct ExpressionActionsChain throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); } - return steps.back().actions; + return steps.back().actions(); } Step & getLastStep() diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 33f872ef15f..0cfdbb52598 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1,30 +1,20 @@ -#include -#include - #include #include #include #include -#include #include #include -#include #include #include -#include #include #include -#include -#include -#include - #include #include #include -#include +#include #include #include #include @@ -42,20 +32,14 @@ #include #include -#include -#include #include #include #include -#include #include -#include -#include #include -#include #include #include #include @@ -180,7 +164,11 @@ void ExpressionAnalyzer::analyzeAggregation() { getRootActionsNoMakeSet(array_join_expression_list, true, temp_actions, false); if (auto array_join = addMultipleArrayJoinAction(temp_actions, is_array_join_left)) - temp_actions->add(ExpressionAction::arrayJoin(array_join)); + { + auto sample_block = temp_actions->getSampleBlock(); + array_join->prepare(sample_block); + temp_actions = std::make_shared(sample_block.getColumnsWithTypeAndName(), context); + } for (auto & column : temp_actions->getSampleBlock().getNamesAndTypesList()) if (syntax->array_join_result_to_source.count(column.name)) @@ -480,10 +468,10 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); - getRootActions(array_join_expression_list, only_types, step.actions); + getRootActions(array_join_expression_list, only_types, step.actions()); before_array_join = chain.getLastActions(); - auto array_join = addMultipleArrayJoinAction(step.actions, is_array_join_left); + auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left); chain.steps.push_back(ExpressionActionsChain::Step(array_join, step.getResultColumns())); @@ -501,7 +489,7 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain & { ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join); - getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions); + getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions()); return true; } @@ -511,7 +499,7 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain) ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join); - addJoinAction(step.actions, table_join); + addJoinAction(step.actions(), table_join); return true; } @@ -635,12 +623,12 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( return false; auto & step = chain.lastStep(sourceColumns()); - getRootActions(select_query->prewhere(), only_types, step.actions); + getRootActions(select_query->prewhere(), only_types, step.actions()); String prewhere_column_name = select_query->prewhere()->getColumnName(); step.required_output.push_back(prewhere_column_name); step.can_remove_required_output.push_back(true); - auto filter_type = step.actions->getSampleBlock().getByName(prewhere_column_name).type; + auto filter_type = step.actions()->getSampleBlock().getByName(prewhere_column_name).type; if (!filter_type->canBeUsedInBooleanContext()) throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); @@ -664,7 +652,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( } } - auto names = step.actions->getSampleBlock().getNames(); + auto names = step.actions()->getSampleBlock().getNames(); NameSet name_set(names.begin(), names.end()); for (const auto & column : sourceColumns()) @@ -672,7 +660,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( name_set.erase(column.name); Names required_output(name_set.begin(), name_set.end()); - step.actions->finalize(required_output); + step.actions()->finalize(required_output); } { @@ -683,8 +671,8 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( /// 2. Store side columns which were calculated during prewhere actions execution if they are used. /// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step. /// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN. - ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName(); - auto required_columns = step.actions->getRequiredColumns(); + ColumnsWithTypeAndName columns = step.actions()->getSampleBlock().getColumnsWithTypeAndName(); + auto required_columns = step.actions()->getRequiredColumns(); NameSet prewhere_input_names(required_columns.begin(), required_columns.end()); NameSet unused_source_columns; @@ -709,7 +697,7 @@ void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsCha ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); // FIXME: assert(filter_info); - step.actions = std::move(actions); + step = ExpressionActionsChain::Step(std::move(actions)); step.required_output.push_back(std::move(column_name)); step.can_remove_required_output = {true}; @@ -729,9 +717,9 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, step.required_output.push_back(where_column_name); step.can_remove_required_output = {true}; - getRootActions(select_query->where(), only_types, step.actions); + getRootActions(select_query->where(), only_types, step.actions()); - auto filter_type = step.actions->getSampleBlock().getByName(where_column_name).type; + auto filter_type = step.actions()->getSampleBlock().getByName(where_column_name).type; if (!filter_type->canBeUsedInBooleanContext()) throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); @@ -753,7 +741,7 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain for (const auto & ast : asts) { step.required_output.emplace_back(ast->getColumnName()); - getRootActions(ast, only_types, step.actions); + getRootActions(ast, only_types, step.actions()); } if (optimize_aggregation_in_order) @@ -793,7 +781,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression /// TODO: data.aggregates -> aggregates() for (const ASTFunction * node : data.aggregates) for (auto & argument : node->arguments->children) - getRootActions(argument, only_types, step.actions); + getRootActions(argument, only_types, step.actions()); } bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types) @@ -806,7 +794,7 @@ bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); step.required_output.push_back(select_query->having()->getColumnName()); - getRootActions(select_query->having(), only_types, step.actions); + getRootActions(select_query->having(), only_types, step.actions()); return true; } @@ -817,7 +805,7 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); - getRootActions(select_query->select(), only_types, step.actions); + getRootActions(select_query->select(), only_types, step.actions()); for (const auto & child : select_query->select()->children) step.required_output.push_back(child->getColumnName()); @@ -833,7 +821,7 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); - getRootActions(select_query->orderBy(), only_types, step.actions); + getRootActions(select_query->orderBy(), only_types, step.actions()); for (auto & child : select_query->orderBy()->children) { @@ -864,7 +852,7 @@ bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); - getRootActions(select_query->limitBy(), only_types, step.actions); + getRootActions(select_query->limitBy(), only_types, step.actions()); NameSet aggregated_names; for (const auto & column : aggregated_columns) @@ -929,14 +917,14 @@ void SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & } } - step.actions->add(ExpressionAction::project(result_columns)); + step.actions()->add(ExpressionAction::project(result_columns)); } void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types) { ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); - getRootActions(expr, only_types, step.actions); + getRootActions(expr, only_types, step.actions()); step.required_output.push_back(expr->getColumnName()); } @@ -1077,7 +1065,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere)) { prewhere_info = std::make_shared( - chain.steps.front().actions, query.prewhere()->getColumnName()); + chain.steps.front().actions(), query.prewhere()->getColumnName()); if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings)) { diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 96cd5f5bea5..4bfa84090c2 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 0d3ba60640b..ec8ab4b3424 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -623,7 +623,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & actions_chain.finalize(); /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes()) + for (const auto & column : actions_chain.steps.front().actions()->getRequiredColumnsWithTypes()) prepared_stages[i - 1].output_columns.insert(column.name); } @@ -667,12 +667,12 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve if (i < stage.filter_column_names.size()) { /// Execute DELETEs. - in = std::make_shared(in, step.actions, stage.filter_column_names[i]); + in = std::make_shared(in, step.actions(), stage.filter_column_names[i]); } else { /// Execute UPDATE or final projection. - in = std::make_shared(in, step.actions); + in = std::make_shared(in, step.actions()); } } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 23b7b04af26..c42bbc5b966 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -36,16 +36,17 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } -void ExpressionStep::updateInputStream(DataStream input_stream, Block result_header) +void ExpressionStep::updateInputStream(DataStream input_stream, bool keep_header) { + Block out_header = keep_header ? std::move(output_stream->header) + : Transform::transformHeader(input_stream.header, expression); output_stream = createOutputStream( input_stream, - res_header ? res_header : Transform::transformHeader(input_stream.header, expression), + std::move(out_header), getDataStreamTraits()); input_streams.clear(); input_streams.emplace_back(std::move(input_stream)); - res_header = std::move(result_header); } void ExpressionStep::transformPipeline(QueryPipeline & pipeline) @@ -56,11 +57,12 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline) return std::make_shared(header, expression, on_totals); }); - if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header)) { pipeline.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + return std::make_shared(header, output_stream->header, + ConvertingTransform::MatchColumnsMode::Name); }); } } diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 40db2dab817..6a5ea4b68f0 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -21,7 +21,7 @@ public: void transformPipeline(QueryPipeline & pipeline) override; - void updateInputStream(DataStream input_stream, Block result_header); + void updateInputStream(DataStream input_stream, bool keep_header); void describeActions(FormatSettings & settings) const override; @@ -29,7 +29,6 @@ public: private: ExpressionActionsPtr expression; - Block res_header; }; /// TODO: add separate step for join. diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 9d7ba499979..8fe82ae6a24 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -41,16 +41,19 @@ FilterStep::FilterStep( updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } -void FilterStep::updateInputStream(DataStream input_stream, Block result_header) +void FilterStep::updateInputStream(DataStream input_stream, bool keep_header) { + Block out_header = std::move(output_stream->header); + if (keep_header) + out_header = FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column); + output_stream = createOutputStream( input_stream, - res_header ? res_header : FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column), + std::move(out_header), getDataStreamTraits()); input_streams.clear(); input_streams.emplace_back(std::move(input_stream)); - res_header = std::move(result_header); } void FilterStep::transformPipeline(QueryPipeline & pipeline) @@ -61,11 +64,11 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline) return std::make_shared(header, expression, filter_column_name, remove_filter_column, on_totals); }); - if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header)) { pipeline.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + return std::make_shared(header, output_stream->header, ConvertingTransform::MatchColumnsMode::Name); }); } } diff --git a/src/Processors/QueryPlan/FilterStep.h b/src/Processors/QueryPlan/FilterStep.h index 163ef5c128e..d827fe920eb 100644 --- a/src/Processors/QueryPlan/FilterStep.h +++ b/src/Processors/QueryPlan/FilterStep.h @@ -20,7 +20,7 @@ public: String getName() const override { return "Filter"; } void transformPipeline(QueryPipeline & pipeline) override; - void updateInputStream(DataStream input_stream, Block result_header); + void updateInputStream(DataStream input_stream, bool keep_header); void describeActions(FormatSettings & settings) const override; @@ -32,7 +32,6 @@ private: ExpressionActionsPtr expression; String filter_column_name; bool remove_filter_column; - Block res_header; }; } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index 98949309eda..31b9de2fcee 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -413,6 +413,7 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_ parent.swap(child); } +/// Move ARRAY JOIN up if possible. static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes) { auto & parent = parent_node->step; @@ -431,17 +432,18 @@ static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns); /// No actions can be moved before ARRAY JOIN. - if (split_actions->getActions().empty()) + if (!split_actions) return; - auto expected_header = parent->getOutputStream().header; - /// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin. if (expression->getActions().empty()) { - /// Expression -> ArrayJoin + auto expected_header = parent->getOutputStream().header; + + /// Expression/Filter -> ArrayJoin std::swap(parent, child); - /// ArrayJoin -> Expression + /// ArrayJoin -> Expression/Filter + if (expression_step) child = std::make_unique(child_node->children.at(0)->step->getOutputStream(), std::move(split_actions)); @@ -461,26 +463,12 @@ static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * node.children.swap(child_node->children); child_node->children.emplace_back(&node); /// Expression/Filter -> ArrayJoin -> node -> Something -// if (filter_step && split_actions->getSampleBlock().has(filter_step->getFilterColumnName())) -// { -// /// Filter -> ArrayJoin -> node -> Something -// node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), -// std::move(split_actions), -// filter_step->getFilterColumnName(), -// filter_step->removesFilterColumn()); -// -// array_join_step->updateInputStream(node.step->getOutputStream()); -// -// parent = std::make_unique(array_join_step->getOutputStream(), -// filter_step->getExpression()); -// /// Expression -> ArrayJoin -> Filter -> Something -// } node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), std::move(split_actions)); array_join_step->updateInputStream(node.step->getOutputStream(), {}); - expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), expected_header) - : filter_step->updateInputStream(array_join_step->getOutputStream(), expected_header); + expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), true) + : filter_step->updateInputStream(array_join_step->getOutputStream(), true); } void QueryPlan::optimize() diff --git a/src/Processors/Transforms/ArrayJoinTransform.cpp b/src/Processors/Transforms/ArrayJoinTransform.cpp index ba8e4949f7c..9058d7df2a0 100644 --- a/src/Processors/Transforms/ArrayJoinTransform.cpp +++ b/src/Processors/Transforms/ArrayJoinTransform.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes Block ArrayJoinTransform::transformHeader(Block header, const ArrayJoinActionPtr & array_join) { - array_join->execute(header, true); + array_join->execute(header); return header; } @@ -30,7 +30,7 @@ ArrayJoinTransform::ArrayJoinTransform( void ArrayJoinTransform::transform(Chunk & chunk) { auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); - array_join->execute(block, false); + array_join->execute(block); chunk.setColumns(block.getColumns(), block.rows()); } From 5cd431252983032a641a264b4c664142d166a6ff Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 19 Aug 2020 22:33:49 +0300 Subject: [PATCH 6/8] Review fixes. --- src/Interpreters/ExpressionActions.cpp | 93 ++++++-------------- src/Interpreters/ExpressionActions.h | 102 +++++++++++++--------- src/Interpreters/ExpressionAnalyzer.cpp | 21 +++-- src/Interpreters/MutationsInterpreter.cpp | 6 +- 4 files changed, 104 insertions(+), 118 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index c04ebb62c4c..00fcf1df547 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1153,22 +1153,17 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe new_actions.emplace_back(action); } - else + else if (action.type == ExpressionAction::REMOVE_COLUMN) { /// Exception for REMOVE_COLUMN. /// We cannot move it to split_actions if any argument from `this` needed that column. - if (action.type == ExpressionAction::REMOVE_COLUMN) - { - if (array_join_dependent_columns_arguments.count(action.source_name)) - new_actions.emplace_back(action); - else - split_actions->add(action); - - continue; - } - - split_actions->add(action); + if (array_join_dependent_columns_arguments.count(action.source_name)) + new_actions.emplace_back(action); + else + split_actions->add(action); } + else + split_actions->add(action); } /// Return empty actions if nothing was separated. Keep `this` unchanged. @@ -1365,8 +1360,8 @@ void ExpressionActionsChain::addStep() if (steps.empty()) throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); - ColumnsWithTypeAndName columns = steps.back().getResultColumns(); - steps.push_back(Step(std::make_shared(columns, context))); + ColumnsWithTypeAndName columns = steps.back()->getResultColumns(); + steps.push_back(std::make_unique(std::make_shared(columns, context))); } void ExpressionActionsChain::finalize() @@ -1374,16 +1369,16 @@ void ExpressionActionsChain::finalize() /// Finalize all steps. Right to left to define unnecessary input columns. for (int i = static_cast(steps.size()) - 1; i >= 0; --i) { - Names required_output = steps[i].required_output; + Names required_output = steps[i]->required_output; std::unordered_map required_output_indexes; for (size_t j = 0; j < required_output.size(); ++j) required_output_indexes[required_output[j]] = j; - auto & can_remove_required_output = steps[i].can_remove_required_output; + auto & can_remove_required_output = steps[i]->can_remove_required_output; if (i + 1 < static_cast(steps.size())) { - const NameSet & additional_input = steps[i + 1].additional_input; - for (const auto & it : steps[i + 1].getRequiredColumns()) + const NameSet & additional_input = steps[i + 1]->additional_input; + for (const auto & it : steps[i + 1]->getRequiredColumns()) { if (additional_input.count(it.name) == 0) { @@ -1395,19 +1390,19 @@ void ExpressionActionsChain::finalize() } } } - steps[i].finalize(required_output); + steps[i]->finalize(required_output); } /// Adding the ejection of unnecessary columns to the beginning of each step. for (size_t i = 1; i < steps.size(); ++i) { - size_t columns_from_previous = steps[i - 1].getResultColumns().size(); + size_t columns_from_previous = steps[i - 1]->getResultColumns().size(); /// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step. /// Except when we drop all the columns and lose the number of rows in the block. - if (!steps[i].getResultColumns().empty() - && columns_from_previous > steps[i].getRequiredColumns().size()) - steps[i].prependProjectInput(); + if (!steps[i]->getResultColumns().empty() + && columns_from_previous > steps[i]->getRequiredColumns().size()) + steps[i]->prependProjectInput(); } } @@ -1419,16 +1414,17 @@ std::string ExpressionActionsChain::dumpChain() const { ss << "step " << i << "\n"; ss << "required output:\n"; - for (const std::string & name : steps[i].required_output) + for (const std::string & name : steps[i]->required_output) ss << name << "\n"; - ss << "\n" << steps[i].dump() << "\n"; + ss << "\n" << steps[i]->dump() << "\n"; } return ss.str(); } -ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_) - : array_join(std::move(array_join_)) +ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_outputs_) + : Step(std::move(required_outputs_)) + , array_join(std::move(array_join_)) , result_columns(std::move(required_columns_)) { for (auto & column : result_columns) @@ -1445,7 +1441,7 @@ ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_jo } } -void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_output_) +void ExpressionActionsChain::ArrayJoinStep::finalize(const Names & required_output_) { NamesAndTypesList new_required_columns; ColumnsWithTypeAndName new_result_columns; @@ -1466,47 +1462,14 @@ void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_outp std::swap(result_columns, new_result_columns); } -ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns) - : link(ArrayJoinLink(std::move(array_join), std::move(required_columns))) +ExpressionActionsPtr & ExpressionActionsChain::Step::actions() { + return typeid_cast(this)->actions; } -template -static Res dispatch(Ptr * ptr, Callback && callback) +const ExpressionActionsPtr & ExpressionActionsChain::Step::actions() const { - if (std::holds_alternative(ptr->link)) - return callback(std::get(ptr->link)); - if (std::holds_alternative(ptr->link)) - return callback(std::get(ptr->link)); - - throw Exception("Unknown variant in ExpressionActionsChain step", ErrorCodes::LOGICAL_ERROR); -} - -const NamesAndTypesList & ExpressionActionsChain::Step::getRequiredColumns() const -{ - using Res = const NamesAndTypesList &; - return dispatch(this, [](auto & x) -> Res { return x.getRequiredColumns(); }); -} - -const ColumnsWithTypeAndName & ExpressionActionsChain::Step::getResultColumns() const -{ - using Res = const ColumnsWithTypeAndName &; - return dispatch(this, [](auto & x) -> Res{ return x.getResultColumns(); }); -} - -void ExpressionActionsChain::Step::finalize(const Names & required_output_) -{ - dispatch(this, [&required_output_](auto & x) { x.finalize(required_output_); }); -} - -void ExpressionActionsChain::Step::prependProjectInput() const -{ - dispatch(this, [](auto & x) { x.prependProjectInput(); }); -} - -std::string ExpressionActionsChain::Step::dump() const -{ - return dispatch(this, [](auto & x) { return x.dump(); }); + return typeid_cast(this)->actions; } } diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index a11295a317c..441e3e3d44b 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -287,35 +287,11 @@ struct ExpressionActionsChain { explicit ExpressionActionsChain(const Context & context_) : context(context_) {} - struct ExpressionActionsLink - { - ExpressionActionsPtr actions; - - const NamesAndTypesList & getRequiredColumns() const { return actions->getRequiredColumnsWithTypes(); } - const ColumnsWithTypeAndName & getResultColumns() const { return actions->getSampleBlock().getColumnsWithTypeAndName(); } - void finalize(const Names & required_output_) const { actions->finalize(required_output_); } - void prependProjectInput() const { actions->prependProjectInput(); } - std::string dump() const { return actions->dumpActions(); } - }; - - struct ArrayJoinLink - { - ArrayJoinActionPtr array_join; - NamesAndTypesList required_columns; - ColumnsWithTypeAndName result_columns; - - ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_); - - const NamesAndTypesList & getRequiredColumns() const { return required_columns; } - const ColumnsWithTypeAndName & getResultColumns() const { return result_columns; } - void finalize(const Names & required_output_); - void prependProjectInput() const {} /// TODO: remove unused columns before ARRAY JOIN ? - static std::string dump() { return "ARRAY JOIN"; } - }; struct Step { - std::variant link; + virtual ~Step() = default; + explicit Step(Names required_output_) : required_output(std::move(required_output_)) {} /// Columns were added to the block before current step in addition to prev step output. NameSet additional_input; @@ -326,28 +302,72 @@ struct ExpressionActionsChain /// If not empty, has the same size with required_output; is filled in finalize(). std::vector can_remove_required_output; - public: - explicit Step(ExpressionActionsPtr actions, const Names & required_output_ = Names()) - : link(ExpressionActionsLink{std::move(actions)}) - , required_output(required_output_) + virtual const NamesAndTypesList & getRequiredColumns() const = 0; + virtual const ColumnsWithTypeAndName & getResultColumns() const = 0; + /// Remove unused result and update required columns + virtual void finalize(const Names & required_output_) = 0; + /// Add projections to expression + virtual void prependProjectInput() const = 0; + virtual std::string dump() const = 0; + + /// Only for ExpressionActionsStep + ExpressionActionsPtr & actions(); + const ExpressionActionsPtr & actions() const; + }; + + struct ExpressionActionsStep : public Step + { + ExpressionActionsPtr actions; + + explicit ExpressionActionsStep(ExpressionActionsPtr actions_, Names required_output_ = Names()) + : Step(std::move(required_output_)) + , actions(std::move(actions_)) { } - explicit Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns); + const NamesAndTypesList & getRequiredColumns() const override + { + return actions->getRequiredColumnsWithTypes(); + } - const NamesAndTypesList & getRequiredColumns() const; - const ColumnsWithTypeAndName & getResultColumns() const; - /// Remove unused result and update required columns - void finalize(const Names & required_output_); - /// Add projections to expression - void prependProjectInput() const; - std::string dump() const; + const ColumnsWithTypeAndName & getResultColumns() const override + { + return actions->getSampleBlock().getColumnsWithTypeAndName(); + } - ExpressionActionsPtr & actions() { return std::get(link).actions; } - const ExpressionActionsPtr & actions() const { return std::get(link).actions; } + void finalize(const Names & required_output_) override + { + actions->finalize(required_output_); + } + + void prependProjectInput() const override + { + actions->prependProjectInput(); + } + + std::string dump() const override + { + return actions->dumpActions(); + } }; - using Steps = std::vector; + struct ArrayJoinStep : public Step + { + ArrayJoinActionPtr array_join; + NamesAndTypesList required_columns; + ColumnsWithTypeAndName result_columns; + + ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_); + + const NamesAndTypesList & getRequiredColumns() const override { return required_columns; } + const ColumnsWithTypeAndName & getResultColumns() const override { return result_columns; } + void finalize(const Names & required_output_) override; + void prependProjectInput() const override {} /// TODO: remove unused columns before ARRAY JOIN ? + std::string dump() const override { return "ARRAY JOIN"; } + }; + + using StepPtr = std::unique_ptr; + using Steps = std::vector; const Context & context; Steps steps; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 0cfdbb52598..62c33a56ca8 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -473,7 +473,9 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi before_array_join = chain.getLastActions(); auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left); - chain.steps.push_back(ExpressionActionsChain::Step(array_join, step.getResultColumns())); + chain.steps.push_back(std::make_unique( + array_join, step.getResultColumns(), + Names())); /// Required output is empty because all array joined columns are kept by step. chain.addStep(); @@ -685,8 +687,9 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( } } - chain.steps.emplace_back(std::make_shared(std::move(columns), context)); - chain.steps.back().additional_input = std::move(unused_source_columns); + chain.steps.emplace_back(std::make_unique( + std::make_shared(std::move(columns), context))); + chain.steps.back()->additional_input = std::move(unused_source_columns); } return true; @@ -697,7 +700,7 @@ void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsCha ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); // FIXME: assert(filter_info); - step = ExpressionActionsChain::Step(std::move(actions)); + step.actions() = std::move(actions); step.required_output.push_back(std::move(column_name)); step.can_remove_required_output = {true}; @@ -1065,7 +1068,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere)) { prewhere_info = std::make_shared( - chain.steps.front().actions(), query.prewhere()->getColumnName()); + chain.steps.front()->actions(), query.prewhere()->getColumnName()); if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings)) { @@ -1108,7 +1111,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( { Block before_where_sample; if (chain.steps.size() > 1) - before_where_sample = Block(chain.steps[chain.steps.size() - 2].getResultColumns()); + before_where_sample = Block(chain.steps[chain.steps.size() - 2]->getResultColumns()); else before_where_sample = source_header; if (sanitizeBlock(before_where_sample)) @@ -1189,7 +1192,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co { if (hasPrewhere()) { - const ExpressionActionsChain::Step & step = chain.steps.at(0); + const ExpressionActionsChain::Step & step = *chain.steps.at(0); prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0); Names columns_to_remove; @@ -1214,10 +1217,10 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co else if (hasFilter()) { /// Can't have prewhere and filter set simultaneously - filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0); + filter_info->do_remove_column = chain.steps.at(0)->can_remove_required_output.at(0); } if (hasWhere()) - remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0); + remove_where_filter = chain.steps.at(where_step_num)->can_remove_required_output.at(0); } void ExpressionAnalysisResult::removeExtraColumns() const diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index ec8ab4b3424..894c75b07b4 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -623,7 +623,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & actions_chain.finalize(); /// Propagate information about columns needed as input. - for (const auto & column : actions_chain.steps.front().actions()->getRequiredColumnsWithTypes()) + for (const auto & column : actions_chain.steps.front()->actions()->getRequiredColumnsWithTypes()) prepared_stages[i - 1].output_columns.insert(column.name); } @@ -667,12 +667,12 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve if (i < stage.filter_column_names.size()) { /// Execute DELETEs. - in = std::make_shared(in, step.actions(), stage.filter_column_names[i]); + in = std::make_shared(in, step->actions(), stage.filter_column_names[i]); } else { /// Execute UPDATE or final projection. - in = std::make_shared(in, step.actions()); + in = std::make_shared(in, step->actions()); } } From b3791d7f6ed4cc30ba5580c08b49fc4375707255 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 19 Aug 2020 22:58:23 +0300 Subject: [PATCH 7/8] Fix build. --- src/Interpreters/ExpressionActions.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 441e3e3d44b..17715fc216e 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -390,7 +390,7 @@ struct ExpressionActionsChain throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); } - return steps.back().actions(); + return steps.back()->actions(); } Step & getLastStep() @@ -398,14 +398,14 @@ struct ExpressionActionsChain if (steps.empty()) throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); - return steps.back(); + return *steps.back(); } Step & lastStep(const NamesAndTypesList & columns) { if (steps.empty()) - steps.emplace_back(std::make_shared(columns, context)); - return steps.back(); + steps.emplace_back(std::make_unique(std::make_shared(columns, context))); + return *steps.back(); } std::string dumpChain() const; From d3fa5895fb4c1f431bcd11210e06f5e085767db9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 20 Aug 2020 12:33:16 +0300 Subject: [PATCH 8/8] Fix build. --- src/Interpreters/ExpressionActions.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 00fcf1df547..218e4bba973 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1422,8 +1422,8 @@ std::string ExpressionActionsChain::dumpChain() const return ss.str(); } -ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_outputs_) - : Step(std::move(required_outputs_)) +ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_) + : Step(std::move(required_output_)) , array_join(std::move(array_join_)) , result_columns(std::move(required_columns_)) {