Stable explain

This commit is contained in:
Nikolai Kochetov 2024-09-06 10:29:47 +00:00
parent d23145fd19
commit fdbf8e71ab
17 changed files with 85 additions and 57 deletions

View File

@ -2028,8 +2028,9 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
return {std::move(first_actions), std::move(second_actions), std::move(split_nodes_mapping)};
}
ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const
ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const Names & array_joined_columns) const
{
std::unordered_set<std::string_view> array_joined_columns_set(array_joined_columns.begin(), array_joined_columns.end());
struct Frame
{
const Node * node = nullptr;
@ -2072,7 +2073,7 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet &
if (cur.next_child_to_visit == cur.node->children.size())
{
bool depend_on_array_join = false;
if (cur.node->type == ActionType::INPUT && array_joined_columns.contains(cur.node->result_name))
if (cur.node->type == ActionType::INPUT && array_joined_columns_set.contains(cur.node->result_name))
depend_on_array_join = true;
for (const auto * child : cur.node->children)

View File

@ -340,7 +340,7 @@ public:
SplitResult split(std::unordered_set<const Node *> split_nodes, bool create_split_nodes_mapping = false, bool avoid_duplicate_inputs = false) const;
/// Splits actions into two parts. Returned first half may be swapped with ARRAY JOIN.
SplitResult splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
SplitResult splitActionsBeforeArrayJoin(const Names & array_joined_columns) const;
/// Splits actions into two parts. First part has minimal size sufficient for calculation of column_name.
/// Outputs of initial actions must contain column_name.

View File

@ -0,0 +1,13 @@
#pragma once
#include <Core/Names.h>
namespace DB
{
struct ArrayJoin
{
Names columns;
bool is_left = false;
};
}

View File

@ -62,8 +62,8 @@ ColumnWithTypeAndName convertArrayJoinColumn(const ColumnWithTypeAndName & src_c
return array_col;
}
ArrayJoinAction::ArrayJoinAction(const NameSet & array_joined_columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_)
: columns(array_joined_columns_)
ArrayJoinAction::ArrayJoinAction(const Names & columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_)
: columns(columns_.begin(), columns_.end())
, is_left(is_left_)
, is_unaligned(is_unaligned_)
, max_block_size(max_block_size_)
@ -80,6 +80,12 @@ ArrayJoinAction::ArrayJoinAction(const NameSet & array_joined_columns_, bool is_
function_builder = std::make_unique<FunctionToOverloadResolverAdaptor>(FunctionEmptyArrayToSingle::createImpl());
}
void ArrayJoinAction::prepare(const Names & columns, ColumnsWithTypeAndName & sample)
{
NameSet columns_set(columns.begin(), columns.end());
return prepare(columns_set, sample);
}
void ArrayJoinAction::prepare(const NameSet & columns, ColumnsWithTypeAndName & sample)
{
for (auto & current : sample)

View File

@ -38,8 +38,9 @@ public:
/// For LEFT ARRAY JOIN.
FunctionOverloadResolverPtr function_builder;
ArrayJoinAction(const NameSet & array_joined_columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_);
ArrayJoinAction(const Names & columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_);
static void prepare(const NameSet & columns, ColumnsWithTypeAndName & sample);
static void prepare(const Names & columns, ColumnsWithTypeAndName & sample);
ArrayJoinResultIteratorPtr execute(Block block);
};

View File

@ -1059,16 +1059,16 @@ std::string ExpressionActionsChain::dumpChain() const
return ss.str();
}
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(const Names & array_join_columns_, ColumnsWithTypeAndName required_columns_)
: Step({})
, array_join(std::move(array_join_))
, array_join_columns(array_join_columns_.begin(), array_join_columns_.end())
, result_columns(std::move(required_columns_))
{
for (auto & column : result_columns)
{
required_columns.emplace_back(NameAndTypePair(column.name, column.type));
if (array_join->columns.contains(column.name))
if (array_join_columns.contains(column.name))
{
const auto & array = getArrayJoinDataType(column.type);
column.type = array->getNestedType();
@ -1085,12 +1085,12 @@ void ExpressionActionsChain::ArrayJoinStep::finalize(const NameSet & required_ou
for (const auto & column : result_columns)
{
if (array_join->columns.contains(column.name) || required_output_.contains(column.name))
if (array_join_columns.contains(column.name) || required_output_.contains(column.name))
new_result_columns.emplace_back(column);
}
for (const auto & column : required_columns)
{
if (array_join->columns.contains(column.name) || required_output_.contains(column.name))
if (array_join_columns.contains(column.name) || required_output_.contains(column.name))
new_required_columns.emplace_back(column);
}

View File

@ -3,6 +3,7 @@
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoin.h>
#include <Interpreters/ExpressionActionsSettings.h>
#include <variant>
@ -22,9 +23,6 @@ class TableJoin;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class ArrayJoinAction;
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
@ -223,11 +221,11 @@ struct ExpressionActionsChain : WithContext
struct ArrayJoinStep : public Step
{
ArrayJoinActionPtr array_join;
const NameSet array_join_columns;
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_);
ArrayJoinStep(const Names & array_join_columns_, ColumnsWithTypeAndName required_columns_);
NamesAndTypesList getRequiredColumns() const override { return required_columns; }
ColumnsWithTypeAndName getResultColumns() const override { return result_columns; }

View File

@ -215,7 +215,7 @@ NamesAndTypesList ExpressionAnalyzer::getColumnsAfterArrayJoin(ActionsDAG & acti
auto array_join = addMultipleArrayJoinAction(actions, is_array_join_left);
auto sample_columns = actions.getResultColumns();
ArrayJoinAction::prepare(array_join->columns, sample_columns);
ArrayJoinAction::prepare(array_join.columns, sample_columns);
actions = ActionsDAG(sample_columns);
NamesAndTypesList new_columns_after_array_join;
@ -889,9 +889,11 @@ const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() cons
}
/// "Big" ARRAY JOIN.
ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAG & actions, bool array_join_is_left) const
ArrayJoin ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAG & actions, bool array_join_is_left) const
{
NameSet result_columns;
Names result_columns;
result_columns.reserve(syntax->array_join_result_to_source.size());
for (const auto & result_source : syntax->array_join_result_to_source)
{
/// Assign new names to columns, if needed.
@ -902,20 +904,19 @@ ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAG & a
}
/// Make ARRAY JOIN (replace arrays with their insides) for the columns in these new names.
result_columns.insert(result_source.first);
result_columns.push_back(result_source.first);
}
const auto & query_settings = getContext()->getSettingsRef();
return std::make_shared<ArrayJoinAction>(result_columns, array_join_is_left, query_settings.enable_unaligned_array_join, query_settings.max_block_size);
return {std::move(result_columns), array_join_is_left};
}
ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, ActionsAndProjectInputsFlagPtr & before_array_join, bool only_types)
std::optional<ArrayJoin> SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, ActionsAndProjectInputsFlagPtr & before_array_join, bool only_types)
{
const auto * select_query = getSelectQuery();
auto [array_join_expression_list, is_array_join_left] = select_query->arrayJoinExpressionList();
if (!array_join_expression_list)
return nullptr;
return {};
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
@ -924,7 +925,7 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi
auto array_join = addMultipleArrayJoinAction(step.actions()->dag, is_array_join_left);
before_array_join = chain.getLastActions();
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ArrayJoinStep>(array_join, step.getResultColumns()));
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ArrayJoinStep>(array_join.columns, step.getResultColumns()));
chain.addStep();

View File

@ -174,7 +174,7 @@ protected:
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables(bool do_global, bool is_explain);
ArrayJoinActionPtr addMultipleArrayJoinAction(ActionsDAG & actions, bool is_left) const;
ArrayJoin addMultipleArrayJoinAction(ActionsDAG & actions, bool is_left) const;
void getRootActions(const ASTPtr & ast, bool no_makeset_for_subqueries, ActionsDAG & actions, bool only_consts = false);
@ -234,7 +234,7 @@ struct ExpressionAnalysisResult
bool use_grouping_set_key = false;
ActionsAndProjectInputsFlagPtr before_array_join;
ArrayJoinActionPtr array_join;
std::optional<ArrayJoin> array_join;
ActionsAndProjectInputsFlagPtr before_join;
ActionsAndProjectInputsFlagPtr converting_join_columns;
JoinPtr join;
@ -388,7 +388,7 @@ private:
*/
/// Before aggregation:
ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ActionsAndProjectInputsFlagPtr & before_array_join, bool only_types);
std::optional<ArrayJoin> appendArrayJoin(ExpressionActionsChain & chain, ActionsAndProjectInputsFlagPtr & before_array_join, bool only_types);
bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types);
JoinPtr appendJoin(ExpressionActionsChain & chain, ActionsAndProjectInputsFlagPtr & converting_join_columns);

View File

@ -1679,10 +1679,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
QueryPlanStepPtr array_join_step
= std::make_unique<ArrayJoinStep>(
query_plan.getCurrentDataStream(),
expressions.array_join->columns,
expressions.array_join->is_left,
expressions.array_join->is_unaligned,
expressions.array_join->max_block_size);
*expressions.array_join,
settings.enable_unaligned_array_join,
settings.max_block_size);
array_join_step->setStepDescription("ARRAY JOIN");
query_plan.addStep(std::move(array_join_step));

View File

@ -1654,11 +1654,12 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
PlannerActionsVisitor actions_visitor(planner_context);
std::unordered_set<std::string> array_join_expressions_output_nodes;
NameSet array_join_column_names;
Names array_join_column_names;
array_join_column_names.reserve(array_join_node.getJoinExpressions().getNodes().size());
for (auto & array_join_expression : array_join_node.getJoinExpressions().getNodes())
{
const auto & array_join_column_identifier = planner_context->getColumnNodeIdentifierOrThrow(array_join_expression);
array_join_column_names.insert(array_join_column_identifier);
array_join_column_names.push_back(array_join_column_identifier);
auto & array_join_expression_column = array_join_expression->as<ColumnNode &>();
auto expression_dag_index_nodes = actions_visitor.visit(array_join_action_dag, array_join_expression_column.getExpressionOrThrow());
@ -1710,8 +1711,7 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
const auto & settings = planner_context->getQueryContext()->getSettingsRef();
auto array_join_step = std::make_unique<ArrayJoinStep>(
plan.getCurrentDataStream(),
std::move(array_join_column_names),
array_join_node.isLeft(),
ArrayJoin{std::move(array_join_column_names), array_join_node.isLeft()},
settings.enable_unaligned_array_join,
settings.max_block_size);

View File

@ -24,13 +24,12 @@ static ITransformingStep::Traits getTraits()
};
}
ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, NameSet columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_)
ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoin array_join_, bool is_unaligned_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
ArrayJoinTransform::transformHeader(input_stream_.header, columns_),
ArrayJoinTransform::transformHeader(input_stream_.header, array_join_.columns),
getTraits())
, columns(std::move(columns_))
, is_left(is_left_)
, array_join(std::move(array_join_))
, is_unaligned(is_unaligned_)
, max_block_size(max_block_size_)
{
@ -39,16 +38,16 @@ ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, NameSet columns_,
void ArrayJoinStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), ArrayJoinTransform::transformHeader(input_streams.front().header, columns), getDataStreamTraits());
input_streams.front(), ArrayJoinTransform::transformHeader(input_streams.front().header, array_join.columns), getDataStreamTraits());
}
void ArrayJoinStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto array_join = std::make_shared<ArrayJoinAction>(columns, is_left, is_unaligned, max_block_size);
auto array_join_actions = std::make_shared<ArrayJoinAction>(array_join.columns, array_join.is_left, is_unaligned, max_block_size);
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<ArrayJoinTransform>(header, array_join, on_totals);
return std::make_shared<ArrayJoinTransform>(header, array_join_actions, on_totals);
});
}
@ -57,8 +56,8 @@ void ArrayJoinStep::describeActions(FormatSettings & settings) const
String prefix(settings.offset, ' ');
bool first = true;
settings.out << prefix << (is_left ? "LEFT " : "") << "ARRAY JOIN ";
for (const auto & column : columns)
settings.out << prefix << (array_join.is_left ? "LEFT " : "") << "ARRAY JOIN ";
for (const auto & column : array_join.columns)
{
if (!first)
settings.out << ", ";
@ -72,10 +71,10 @@ void ArrayJoinStep::describeActions(FormatSettings & settings) const
void ArrayJoinStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Left", is_left);
map.add("Left", array_join.is_left);
auto columns_array = std::make_unique<JSONBuilder::JSONArray>();
for (const auto & column : columns)
for (const auto & column : array_join.columns)
columns_array->add(column);
map.add("Columns", std::move(columns_array));

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Interpreters/ArrayJoin.h>
namespace DB
{
@ -10,7 +11,7 @@ using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
class ArrayJoinStep : public ITransformingStep
{
public:
ArrayJoinStep(const DataStream & input_stream_, NameSet columns_, bool is_left_, bool is_unaligned_, size_t max_block_size_);
ArrayJoinStep(const DataStream & input_stream_, ArrayJoin array_join_, bool is_unaligned_, size_t max_block_size_);
String getName() const override { return "ArrayJoin"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
@ -18,14 +19,13 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const NameSet & getColumns() const { return columns; }
bool isLeft() const { return is_left; }
const Names & getColumns() const { return array_join.columns; }
bool isLeft() const { return array_join.is_left; }
private:
void updateOutputStream() override;
NameSet columns;
bool is_left = false;
ArrayJoin array_join;
bool is_unaligned = false;
size_t max_block_size = DEFAULT_BLOCK_SIZE;
};

View File

@ -521,11 +521,13 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * array_join = typeid_cast<ArrayJoinStep *>(child.get()))
{
const auto & keys = array_join->getColumns();
std::unordered_set<std::string_view> keys_set(keys.begin(), keys.end());
const auto & array_join_header = array_join->getInputStreams().front().header;
Names allowed_inputs;
for (const auto & column : array_join_header)
if (!keys.contains(column.name))
if (!keys_set.contains(column.name))
allowed_inputs.push_back(column.name);
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))

View File

@ -237,6 +237,8 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag, Fi
if (dag)
{
std::unordered_set<std::string_view> keys_set(array_joined_columns.begin(), array_joined_columns.end());
/// Remove array joined columns from outputs.
/// Types are changed after ARRAY JOIN, and we can't use this columns anyway.
ActionsDAG::NodeRawConstPtrs outputs;
@ -244,7 +246,7 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag, Fi
for (const auto & output : dag->getOutputs())
{
if (!array_joined_columns.contains(output->result_name))
if (!keys_set.contains(output->result_name))
outputs.push_back(output);
}

View File

@ -10,7 +10,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
Block ArrayJoinTransform::transformHeader(Block header, const NameSet & array_join_columns)
template <typename Container>
Block transformHeaderImpl(Block header, const Container & array_join_columns)
{
auto columns = header.getColumnsWithTypeAndName();
ArrayJoinAction::prepare(array_join_columns, columns);
@ -19,11 +20,16 @@ Block ArrayJoinTransform::transformHeader(Block header, const NameSet & array_jo
return res;
}
Block ArrayJoinTransform::transformHeader(Block header, const Names & array_join_columns)
{
return transformHeaderImpl(std::move(header), array_join_columns);
}
ArrayJoinTransform::ArrayJoinTransform(
const Block & header_,
ArrayJoinActionPtr array_join_,
bool /*on_totals_*/)
: IInflatingTransform(header_, transformHeader(header_, array_join_->columns))
: IInflatingTransform(header_, transformHeaderImpl(header_, array_join_->columns))
, array_join(std::move(array_join_))
{
/// TODO

View File

@ -22,7 +22,7 @@ public:
String getName() const override { return "ArrayJoinTransform"; }
static Block transformHeader(Block header, const NameSet & array_join_columns);
static Block transformHeader(Block header, const Names & array_join_columns);
protected:
void consume(Chunk chunk) override;