diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 4b5ef7846b7..22d13a66616 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -1077,6 +1077,138 @@ void ExpressionActions::optimizeArrayJoin() } } +ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) +{ + auto split_actions = std::make_shared(*this); + split_actions->actions.clear(); + split_actions->sample_block.clear(); + split_actions->input_columns.clear(); + + for (const auto & input_column : input_columns) + { + if (array_joined_columns.count(input_column.name) == 0) + { + split_actions->input_columns.emplace_back(input_column); + split_actions->sample_block.insert(ColumnWithTypeAndName(nullptr, input_column.type, input_column.name)); + } + } + + /// Do not split action if input depends only on array joined columns. + if (split_actions->input_columns.empty()) + return split_actions; + + NameSet array_join_dependent_columns = array_joined_columns; + /// Columns needed to evaluate arrayJoin or those that depend on it. + /// Actions to delete them can not be moved to the left of the arrayJoin. + NameSet array_join_dependencies; + + Actions new_actions; + for (const auto & action : actions) + { + if (action.type == ExpressionAction::PROJECT) + { + NamesWithAliases split_aliases; + NamesWithAliases depend_aliases; + for (const auto & pair : action.projection) + { + if (!pair.second.empty() || array_join_dependent_columns.count(pair.first)) + { + if (array_join_dependent_columns.count(pair.first)) + { + array_join_dependent_columns.insert(pair.second); + if (!pair.second.empty()) + depend_aliases.emplace_back(std::move(pair)); + } + else if (!pair.second.empty()) + split_aliases.emplace_back(std::move(pair)); + } + } + + if (!split_aliases.empty()) + split_actions->add(ExpressionAction::addAliases(split_aliases)); + + if (!depend_aliases.empty()) + new_actions.emplace_back(ExpressionAction::addAliases(depend_aliases)); + + continue; + } + + bool depends_on_array_join = false; + for (auto & column : action.getNeededColumns()) + if (array_join_dependent_columns.count(column) != 0) + depends_on_array_join = true; + + if (depends_on_array_join) + { + if (!action.result_name.empty()) + array_join_dependent_columns.insert(action.result_name); + if (action.array_join) + array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end()); + + auto needed = action.getNeededColumns(); + array_join_dependencies.insert(needed.begin(), needed.end()); + new_actions.emplace_back(action); + } + else + { + /// Replace PROJECT to ADD_ALIASES, because project may remove columns needed for array join +// if (action.type == ExpressionAction::PROJECT) +// { +// NamesWithAliases projection; +// +// for (auto & column : action.projection) +// { +// if (!column.second.empty()) +// { +// projection.emplace_back(column); +// column.second.clear(); +// } +// } +// +// /// new_actions.emplace_back(action); +// +// if (!projection.empty()) +// { +// action.type = ExpressionAction::ADD_ALIASES; +// action.projection.swap(projection); +// split_actions->add(std::move(action)); +// } +// } +// else + + if (action.type == ExpressionAction::REMOVE_COLUMN) + { + if (array_join_dependencies.count(action.source_name)) + new_actions.emplace_back(action); + else + split_actions->add(action); + + continue; + } + + split_actions->add(action); + } + } + + if (split_actions->getActions().empty()) + return split_actions; + + std::swap(actions, new_actions); + + /// Add input from split actions result. + NamesAndTypesList inputs_from_array_join; + for (auto & column : input_columns) + if (array_joined_columns.count(column.name)) + inputs_from_array_join.emplace_back(std::move(column)); + + input_columns = split_actions->getSampleBlock().getNamesAndTypesList(); + input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end()); + + if (!actions.empty()) + prependProjectInput(); + + return split_actions; +} JoinPtr ExpressionActions::getTableJoinAlgo() const { @@ -1378,7 +1510,7 @@ void ExpressionActionsChain::Step::finalize(const Names & required_output_) } } -void ExpressionActionsChain::Step::prependProjectInput() +void ExpressionActionsChain::Step::prependProjectInput() const { switch (kind) { diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index a5fd3b9d7a1..428a001db05 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -144,6 +144,8 @@ private: void execute(Block & block, bool dry_run) const; }; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; /** Contains a sequence of actions on the block. */ @@ -183,6 +185,8 @@ public: /// Change the corresponding output types to arrays. bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action); + ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns); + /// - Adds actions to delete all but the specified columns. /// - Removes unused input columns. /// - Can somehow optimize the expression. @@ -275,8 +279,6 @@ private: void optimizeArrayJoin(); }; -using ExpressionActionsPtr = std::shared_ptr; - /** The sequence of transformations over the block. * It is assumed that the result of each step is fed to the input of the next step. @@ -353,7 +355,7 @@ struct ExpressionActionsChain void finalize(const Names & required_output_); - void prependProjectInput(); + void prependProjectInput() const; std::string dump() const { @@ -368,6 +370,8 @@ struct ExpressionActionsChain return "ARRAY JOIN"; } } + + __builtin_unreachable(); } }; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 4bfa84090c2..96cd5f5bea5 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include diff --git a/src/Processors/QueryPlan/ArrayJoinStep.cpp b/src/Processors/QueryPlan/ArrayJoinStep.cpp index 2948d4cc842..402f7d4a318 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.cpp +++ b/src/Processors/QueryPlan/ArrayJoinStep.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -32,6 +33,18 @@ ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPt { } +void ArrayJoinStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + ArrayJoinTransform::transformHeader(input_stream.header, array_join), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -39,6 +52,14 @@ void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, array_join, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } void ArrayJoinStep::describeActions(FormatSettings & settings) const diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h index d621d4b2ed1..92c7e0a1304 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.h +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -17,8 +17,13 @@ public: void describeActions(FormatSettings & settings) const override; + void updateInputStream(DataStream input_stream, Block result_header); + + const ArrayJoinActionPtr & arrayJoin() const { return array_join; } + private: ArrayJoinActionPtr array_join; + Block res_header; }; } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 3b0632eff1b..23b7b04af26 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,18 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } +void ExpressionStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + res_header ? res_header : Transform::transformHeader(input_stream.header, expression), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void ExpressionStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -42,6 +55,14 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, expression, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } static void doDescribeActions(const ExpressionActionsPtr & expression, IQueryPlanStep::FormatSettings & settings) diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 80d2fd2630d..40db2dab817 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -21,10 +21,15 @@ public: void transformPipeline(QueryPipeline & pipeline) override; + void updateInputStream(DataStream input_stream, Block result_header); + void describeActions(FormatSettings & settings) const override; + const ExpressionActionsPtr & getExpression() const { return expression; } + private: ExpressionActionsPtr expression; + Block res_header; }; /// TODO: add separate step for join. diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 504fd71c56a..9d7ba499979 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,18 @@ FilterStep::FilterStep( updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } +void FilterStep::updateInputStream(DataStream input_stream, Block result_header) +{ + output_stream = createOutputStream( + input_stream, + res_header ? res_header : FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column), + getDataStreamTraits()); + + input_streams.clear(); + input_streams.emplace_back(std::move(input_stream)); + res_header = std::move(result_header); +} + void FilterStep::transformPipeline(QueryPipeline & pipeline) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) @@ -47,6 +60,14 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline) bool on_totals = stream_type == QueryPipeline::StreamType::Totals; return std::make_shared(header, expression, filter_column_name, remove_filter_column, on_totals); }); + + if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header)) + { + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, res_header, ConvertingTransform::MatchColumnsMode::Name); + }); + } } void FilterStep::describeActions(FormatSettings & settings) const diff --git a/src/Processors/QueryPlan/FilterStep.h b/src/Processors/QueryPlan/FilterStep.h index 1decc61349a..163ef5c128e 100644 --- a/src/Processors/QueryPlan/FilterStep.h +++ b/src/Processors/QueryPlan/FilterStep.h @@ -20,12 +20,19 @@ public: String getName() const override { return "Filter"; } void transformPipeline(QueryPipeline & pipeline) override; + void updateInputStream(DataStream input_stream, Block result_header); + void describeActions(FormatSettings & settings) const override; + const ExpressionActionsPtr & getExpression() const { return expression; } + const String & getFilterColumnName() const { return filter_column_name; } + bool removesFilterColumn() const { return remove_filter_column; } + private: ExpressionActionsPtr expression; String filter_column_name; bool remove_filter_column; + Block res_header; }; } diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index ae0147d8724..98949309eda 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include "MergingSortedStep.h" @@ -10,6 +12,9 @@ #include "MergeSortingStep.h" #include "PartialSortingStep.h" #include "TotalsHavingStep.h" +#include "ExpressionStep.h" +#include "ArrayJoinStep.h" +#include "FilterStep.h" namespace DB { @@ -408,6 +413,76 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_ parent.swap(child); } +static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes) +{ + auto & parent = parent_node->step; + auto & child = child_node->step; + auto * expression_step = typeid_cast(parent.get()); + auto * filter_step = typeid_cast(parent.get()); + auto * array_join_step = typeid_cast(child.get()); + + if (!(expression_step || filter_step) || !array_join_step) + return; + + const auto & array_join = array_join_step->arrayJoin(); + const auto & expression = expression_step ? expression_step->getExpression() + : filter_step->getExpression(); + + auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns); + + /// No actions can be moved before ARRAY JOIN. + if (split_actions->getActions().empty()) + return; + + auto expected_header = parent->getOutputStream().header; + + /// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin. + if (expression->getActions().empty()) + { + /// Expression -> ArrayJoin + std::swap(parent, child); + /// ArrayJoin -> Expression + if (expression_step) + child = std::make_unique(child_node->children.at(0)->step->getOutputStream(), + std::move(split_actions)); + else + child = std::make_unique(child_node->children.at(0)->step->getOutputStream(), + std::move(split_actions), + filter_step->getFilterColumnName(), + filter_step->removesFilterColumn()); + + array_join_step->updateInputStream(child->getOutputStream(), expected_header); + return; + } + + /// Add new expression step before ARRAY JOIN. + /// Expression/Filter -> ArrayJoin -> Something + auto & node = nodes.emplace_back(); + node.children.swap(child_node->children); + child_node->children.emplace_back(&node); + /// Expression/Filter -> ArrayJoin -> node -> Something +// if (filter_step && split_actions->getSampleBlock().has(filter_step->getFilterColumnName())) +// { +// /// Filter -> ArrayJoin -> node -> Something +// node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), +// std::move(split_actions), +// filter_step->getFilterColumnName(), +// filter_step->removesFilterColumn()); +// +// array_join_step->updateInputStream(node.step->getOutputStream()); +// +// parent = std::make_unique(array_join_step->getOutputStream(), +// filter_step->getExpression()); +// /// Expression -> ArrayJoin -> Filter -> Something +// } + + node.step = std::make_unique(node.children.at(0)->step->getOutputStream(), + std::move(split_actions)); + array_join_step->updateInputStream(node.step->getOutputStream(), {}); + expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), expected_header) + : filter_step->updateInputStream(array_join_step->getOutputStream(), expected_header); +} + void QueryPlan::optimize() { struct Frame @@ -436,7 +511,13 @@ void QueryPlan::optimize() ++frame.next_child; } else + { + /// Last entrance, try lift up. + if (frame.node->children.size() == 1) + tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes); + stack.pop(); + } } } diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 17ff7dde3af..7ce8d9426c4 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -69,11 +69,10 @@ public: std::vector children = {}; }; -private: - using Nodes = std::list; - Nodes nodes; +private: + Nodes nodes; Node * root = nullptr; void checkInitialized() const;