Lift up ARRAY JOIN step.

This commit is contained in:
Nikolai Kochetov 2020-08-12 16:30:02 +03:00
parent 85d0f1a480
commit 6934974fc5
11 changed files with 304 additions and 7 deletions

View File

@ -1077,6 +1077,138 @@ void ExpressionActions::optimizeArrayJoin()
}
}
ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns)
{
auto split_actions = std::make_shared<ExpressionActions>(*this);
split_actions->actions.clear();
split_actions->sample_block.clear();
split_actions->input_columns.clear();
for (const auto & input_column : input_columns)
{
if (array_joined_columns.count(input_column.name) == 0)
{
split_actions->input_columns.emplace_back(input_column);
split_actions->sample_block.insert(ColumnWithTypeAndName(nullptr, input_column.type, input_column.name));
}
}
/// Do not split action if input depends only on array joined columns.
if (split_actions->input_columns.empty())
return split_actions;
NameSet array_join_dependent_columns = array_joined_columns;
/// Columns needed to evaluate arrayJoin or those that depend on it.
/// Actions to delete them can not be moved to the left of the arrayJoin.
NameSet array_join_dependencies;
Actions new_actions;
for (const auto & action : actions)
{
if (action.type == ExpressionAction::PROJECT)
{
NamesWithAliases split_aliases;
NamesWithAliases depend_aliases;
for (const auto & pair : action.projection)
{
if (!pair.second.empty() || array_join_dependent_columns.count(pair.first))
{
if (array_join_dependent_columns.count(pair.first))
{
array_join_dependent_columns.insert(pair.second);
if (!pair.second.empty())
depend_aliases.emplace_back(std::move(pair));
}
else if (!pair.second.empty())
split_aliases.emplace_back(std::move(pair));
}
}
if (!split_aliases.empty())
split_actions->add(ExpressionAction::addAliases(split_aliases));
if (!depend_aliases.empty())
new_actions.emplace_back(ExpressionAction::addAliases(depend_aliases));
continue;
}
bool depends_on_array_join = false;
for (auto & column : action.getNeededColumns())
if (array_join_dependent_columns.count(column) != 0)
depends_on_array_join = true;
if (depends_on_array_join)
{
if (!action.result_name.empty())
array_join_dependent_columns.insert(action.result_name);
if (action.array_join)
array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end());
auto needed = action.getNeededColumns();
array_join_dependencies.insert(needed.begin(), needed.end());
new_actions.emplace_back(action);
}
else
{
/// Replace PROJECT to ADD_ALIASES, because project may remove columns needed for array join
// if (action.type == ExpressionAction::PROJECT)
// {
// NamesWithAliases projection;
//
// for (auto & column : action.projection)
// {
// if (!column.second.empty())
// {
// projection.emplace_back(column);
// column.second.clear();
// }
// }
//
// /// new_actions.emplace_back(action);
//
// if (!projection.empty())
// {
// action.type = ExpressionAction::ADD_ALIASES;
// action.projection.swap(projection);
// split_actions->add(std::move(action));
// }
// }
// else
if (action.type == ExpressionAction::REMOVE_COLUMN)
{
if (array_join_dependencies.count(action.source_name))
new_actions.emplace_back(action);
else
split_actions->add(action);
continue;
}
split_actions->add(action);
}
}
if (split_actions->getActions().empty())
return split_actions;
std::swap(actions, new_actions);
/// Add input from split actions result.
NamesAndTypesList inputs_from_array_join;
for (auto & column : input_columns)
if (array_joined_columns.count(column.name))
inputs_from_array_join.emplace_back(std::move(column));
input_columns = split_actions->getSampleBlock().getNamesAndTypesList();
input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end());
if (!actions.empty())
prependProjectInput();
return split_actions;
}
JoinPtr ExpressionActions::getTableJoinAlgo() const
{
@ -1378,7 +1510,7 @@ void ExpressionActionsChain::Step::finalize(const Names & required_output_)
}
}
void ExpressionActionsChain::Step::prependProjectInput()
void ExpressionActionsChain::Step::prependProjectInput() const
{
switch (kind)
{

View File

@ -144,6 +144,8 @@ private:
void execute(Block & block, bool dry_run) const;
};
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Contains a sequence of actions on the block.
*/
@ -183,6 +185,8 @@ public:
/// Change the corresponding output types to arrays.
bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action);
ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns);
/// - Adds actions to delete all but the specified columns.
/// - Removes unused input columns.
/// - Can somehow optimize the expression.
@ -275,8 +279,6 @@ private:
void optimizeArrayJoin();
};
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** The sequence of transformations over the block.
* It is assumed that the result of each step is fed to the input of the next step.
@ -353,7 +355,7 @@ struct ExpressionActionsChain
void finalize(const Names & required_output_);
void prependProjectInput();
void prependProjectInput() const;
std::string dump() const
{
@ -368,6 +370,8 @@ struct ExpressionActionsChain
return "ARRAY JOIN";
}
}
__builtin_unreachable();
}
};

View File

@ -1,4 +1,5 @@
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Common/DNSResolver.h>
#include <Common/ActionLock.h>
#include <Common/typeid_cast.h>

View File

@ -1,5 +1,6 @@
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/Transforms/ArrayJoinTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/QueryPipeline.h>
#include <Interpreters/ArrayJoinAction.h>
#include <IO/Operators.h>
@ -32,6 +33,18 @@ ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPt
{
}
void ArrayJoinStep::updateInputStream(DataStream input_stream, Block result_header)
{
output_stream = createOutputStream(
input_stream,
ArrayJoinTransform::transformHeader(input_stream.header, array_join),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
res_header = std::move(result_header);
}
void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
@ -39,6 +52,14 @@ void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline)
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<ArrayJoinTransform>(header, array_join, on_totals);
});
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}
void ArrayJoinStep::describeActions(FormatSettings & settings) const

View File

@ -17,8 +17,13 @@ public:
void describeActions(FormatSettings & settings) const override;
void updateInputStream(DataStream input_stream, Block result_header);
const ArrayJoinActionPtr & arrayJoin() const { return array_join; }
private:
ArrayJoinActionPtr array_join;
Block res_header;
};
}

View File

@ -1,6 +1,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
@ -35,6 +36,18 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ExpressionStep::updateInputStream(DataStream input_stream, Block result_header)
{
output_stream = createOutputStream(
input_stream,
res_header ? res_header : Transform::transformHeader(input_stream.header, expression),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
res_header = std::move(result_header);
}
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
@ -42,6 +55,14 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, expression, on_totals);
});
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}
static void doDescribeActions(const ExpressionActionsPtr & expression, IQueryPlanStep::FormatSettings & settings)

View File

@ -21,10 +21,15 @@ public:
void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, Block result_header);
void describeActions(FormatSettings & settings) const override;
const ExpressionActionsPtr & getExpression() const { return expression; }
private:
ExpressionActionsPtr expression;
Block res_header;
};
/// TODO: add separate step for join.

View File

@ -1,6 +1,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
@ -40,6 +41,18 @@ FilterStep::FilterStep(
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void FilterStep::updateInputStream(DataStream input_stream, Block result_header)
{
output_stream = createOutputStream(
input_stream,
res_header ? res_header : FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
res_header = std::move(result_header);
}
void FilterStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
@ -47,6 +60,14 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline)
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
});
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}
void FilterStep::describeActions(FormatSettings & settings) const

View File

@ -20,12 +20,19 @@ public:
String getName() const override { return "Filter"; }
void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, Block result_header);
void describeActions(FormatSettings & settings) const override;
const ExpressionActionsPtr & getExpression() const { return expression; }
const String & getFilterColumnName() const { return filter_column_name; }
bool removesFilterColumn() const { return remove_filter_column; }
private:
ExpressionActionsPtr expression;
String filter_column_name;
bool remove_filter_column;
Block res_header;
};
}

View File

@ -3,6 +3,8 @@
#include <Processors/QueryPipeline.h>
#include <IO/WriteBuffer.h>
#include <IO/Operators.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ArrayJoinAction.h>
#include <stack>
#include <Processors/QueryPlan/LimitStep.h>
#include "MergingSortedStep.h"
@ -10,6 +12,9 @@
#include "MergeSortingStep.h"
#include "PartialSortingStep.h"
#include "TotalsHavingStep.h"
#include "ExpressionStep.h"
#include "ArrayJoinStep.h"
#include "FilterStep.h"
namespace DB
{
@ -408,6 +413,76 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_
parent.swap(child);
}
static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes)
{
auto & parent = parent_node->step;
auto & child = child_node->step;
auto * expression_step = typeid_cast<ExpressionStep *>(parent.get());
auto * filter_step = typeid_cast<FilterStep *>(parent.get());
auto * array_join_step = typeid_cast<ArrayJoinStep *>(child.get());
if (!(expression_step || filter_step) || !array_join_step)
return;
const auto & array_join = array_join_step->arrayJoin();
const auto & expression = expression_step ? expression_step->getExpression()
: filter_step->getExpression();
auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns);
/// No actions can be moved before ARRAY JOIN.
if (split_actions->getActions().empty())
return;
auto expected_header = parent->getOutputStream().header;
/// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin.
if (expression->getActions().empty())
{
/// Expression -> ArrayJoin
std::swap(parent, child);
/// ArrayJoin -> Expression
if (expression_step)
child = std::make_unique<ExpressionStep>(child_node->children.at(0)->step->getOutputStream(),
std::move(split_actions));
else
child = std::make_unique<FilterStep>(child_node->children.at(0)->step->getOutputStream(),
std::move(split_actions),
filter_step->getFilterColumnName(),
filter_step->removesFilterColumn());
array_join_step->updateInputStream(child->getOutputStream(), expected_header);
return;
}
/// Add new expression step before ARRAY JOIN.
/// Expression/Filter -> ArrayJoin -> Something
auto & node = nodes.emplace_back();
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
/// Expression/Filter -> ArrayJoin -> node -> Something
// if (filter_step && split_actions->getSampleBlock().has(filter_step->getFilterColumnName()))
// {
// /// Filter -> ArrayJoin -> node -> Something
// node.step = std::make_unique<FilterStep>(node.children.at(0)->step->getOutputStream(),
// std::move(split_actions),
// filter_step->getFilterColumnName(),
// filter_step->removesFilterColumn());
//
// array_join_step->updateInputStream(node.step->getOutputStream());
//
// parent = std::make_unique<ExpressionStep>(array_join_step->getOutputStream(),
// filter_step->getExpression());
// /// Expression -> ArrayJoin -> Filter -> Something
// }
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(),
std::move(split_actions));
array_join_step->updateInputStream(node.step->getOutputStream(), {});
expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), expected_header)
: filter_step->updateInputStream(array_join_step->getOutputStream(), expected_header);
}
void QueryPlan::optimize()
{
struct Frame
@ -436,7 +511,13 @@ void QueryPlan::optimize()
++frame.next_child;
}
else
{
/// Last entrance, try lift up.
if (frame.node->children.size() == 1)
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
stack.pop();
}
}
}

View File

@ -69,11 +69,10 @@ public:
std::vector<Node *> children = {};
};
private:
using Nodes = std::list<Node>;
Nodes nodes;
private:
Nodes nodes;
Node * root = nullptr;
void checkInitialized() const;