Remove flags drom ActionsDAG (part 2).

This commit is contained in:
Nikolai Kochetov 2024-06-12 16:53:32 +00:00
parent 85289219f1
commit ae42f8635c
32 changed files with 264 additions and 214 deletions

View File

@ -786,9 +786,6 @@ Block ActionsDAG::updateHeader(const Block & header) const
for (auto & col : result_columns)
res.insert(std::move(col));
if (isInputProjected())
return res;
res.reserve(header.columns() - pos_to_remove.size());
for (size_t i = 0; i < header.columns(); i++)
{
@ -1152,6 +1149,33 @@ void ActionsDAG::project(const NamesWithAliases & projection)
removeUnusedActions();
}
void ActionsDAG::appendInputsForUnusedColumns(const Block & sample_block)
{
std::unordered_map<std::string_view, std::list<size_t>> names_map;
size_t num_columns = sample_block.columns();
for (size_t pos = 0; pos < num_columns; ++pos)
names_map[sample_block.getByPosition(pos).name].push_back(pos);
for (const auto * input : inputs)
{
auto & positions = names_map[input->result_name];
if (positions.empty())
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Not found column {} in block {}", input->result_name, sample_block.dumpStructure());
positions.pop_front();
}
for (const auto & [_, positions] : names_map)
{
for (auto pos : positions)
{
const auto & col = sample_block.getByPosition(pos);
addInput(col.name, col.type);
}
}
}
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
{
for (const auto * output_node : outputs)
@ -1225,8 +1249,6 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
ActionsDAGPtr ActionsDAG::clone() const
{
auto actions = std::make_shared<ActionsDAG>();
actions->project_input = project_input;
actions->projected_output = projected_output;
std::unordered_map<const Node *, Node *> copy_map;
@ -1320,9 +1342,6 @@ std::string ActionsDAG::dumpDAG() const
out << ' ' << map[node];
out << '\n';
out << "Project input: " << project_input << '\n';
out << "Projected output: " << projected_output << '\n';
return out.str();
}
@ -1581,10 +1600,6 @@ void ActionsDAG::mergeInplace(ActionsDAG && second)
auto it = first_result.find(input_node->result_name);
if (it == first_result.end() || it->second.empty())
{
if (first.project_input)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Cannot find column {} in ActionsDAG result", input_node->result_name);
first.inputs.push_back(input_node);
}
else
@ -1620,13 +1635,6 @@ void ActionsDAG::mergeInplace(ActionsDAG && second)
}
}
/// Update output nodes.
if (second.project_input)
{
first.outputs.swap(second.outputs);
first.project_input = true;
}
else
{
/// Add not removed result from first actions.
for (const auto * output_node : first.outputs)
@ -1642,8 +1650,6 @@ void ActionsDAG::mergeInplace(ActionsDAG && second)
}
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
first.projected_output = second.projected_output;
}
void ActionsDAG::mergeNodes(ActionsDAG && second, NodeRawConstPtrs * out_outputs)
@ -2039,7 +2045,6 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet &
}
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
}
@ -2083,7 +2088,6 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
dumpDAG());
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
}
@ -2155,7 +2159,6 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & co
std::unordered_set<const Node *> split_nodes = {node};
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
}

View File

@ -166,9 +166,12 @@ public:
/// Call addAlias several times.
void addAliases(const NamesWithAliases & aliases);
/// Add alias actions and remove unused columns from outputs. Also specify result columns order in outputs.
/// Add alias actions. Also specify result columns order in outputs.
void project(const NamesWithAliases & projection);
/// Add input for every column from sample_block which is not mapped to existing input.
void appendInputsForUnusedColumns(const Block & sample_block);
/// If column is not in outputs, try to find it in nodes and insert back into outputs.
bool tryRestoreColumn(const std::string & column_name);

View File

@ -392,6 +392,9 @@ Block createBlockForSet(
}
ScopeStack::Level::Level() = default;
ScopeStack::Level::~Level() = default;
ScopeStack::Level::Level(Level &&) = default;
FutureSetPtr makeExplicitSet(
const ASTFunction * node, const ActionsDAG & actions, ContextPtr context, PreparedSets & prepared_sets)
@ -529,7 +532,9 @@ std::vector<std::string_view> ActionsMatcher::Data::getAllColumnNames() const
ScopeStack::ScopeStack(ActionsDAG actions_dag, ContextPtr context_) : WithContext(context_)
{
auto & level = stack.emplace_back(ScopeStack::Level{std::move(actions_dag), {}, {}});
ScopeStack::Level tmp;
tmp.actions_dag = std::move(actions_dag);
auto & level = stack.emplace_back(std::move(tmp));
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag.getOutputs());
for (const auto & node : level.actions_dag.getOutputs())
@ -1268,7 +1273,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
lambda_dag.removeUnusedActions(Names(1, result_name));
auto lambda_actions = std::make_shared<ExpressionActions>(
std::make_shared<ActionsDAG>(lambda_dag),
std::make_shared<ActionsDAG>(std::move(lambda_dag)),
ExpressionActionsSettings::fromContext(data.getContext(), CompileExpressions::yes));
DataTypePtr result_type = lambda_actions->getSampleBlock().getByName(result_name).type;

View File

@ -47,6 +47,10 @@ struct ScopeStack : WithContext
ActionsDAG actions_dag;
IndexPtr index;
NameSet inputs;
~Level();
Level();
Level(Level &&);
};
using Levels = std::vector<Level>;

View File

@ -49,8 +49,9 @@ namespace ErrorCodes
static std::unordered_set<const ActionsDAG::Node *> processShortCircuitFunctions(const ActionsDAG & actions_dag, ShortCircuitFunctionEvaluation short_circuit_function_evaluation);
ExpressionActions::ExpressionActions(ActionsDAGPtr actions_dag_, const ExpressionActionsSettings & settings_)
: settings(settings_)
ExpressionActions::ExpressionActions(ActionsDAGPtr actions_dag_, const ExpressionActionsSettings & settings_, bool project_inputs_)
: project_inputs(project_inputs_)
, settings(settings_)
{
actions_dag = actions_dag_->clone();
@ -757,6 +758,10 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run,
}
}
if (project_inputs)
{
block.clear();
}
if (allow_duplicates_in_input)
{
/// This case is the same as when the input is projected

View File

@ -79,11 +79,13 @@ private:
ColumnNumbers result_positions;
Block sample_block;
bool project_inputs = false;
ExpressionActionsSettings settings;
public:
ExpressionActions() = delete;
explicit ExpressionActions(ActionsDAGPtr actions_dag_, const ExpressionActionsSettings & settings_ = {});
explicit ExpressionActions(ActionsDAGPtr actions_dag_, const ExpressionActionsSettings & settings_ = {}, bool project_inputs_ = false);
ExpressionActions(const ExpressionActions &) = default;
ExpressionActions & operator=(const ExpressionActions &) = default;

View File

@ -658,8 +658,8 @@ void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
with_alias->getColumnName(), 1 /* direction */,
1 /* nulls_direction */));
ActionsDAG actions_dag(aggregated_columns);
getRootActions(column_ast, false, actions_dag);
auto actions_dag = std::make_shared<ActionsDAG>(aggregated_columns);
getRootActions(column_ast, false, *actions_dag);
desc.partition_by_actions.push_back(std::move(actions_dag));
}
}
@ -679,8 +679,8 @@ void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
order_by_element.direction,
order_by_element.nulls_direction));
ActionsDAG actions_dag(aggregated_columns);
getRootActions(column_ast, false, actions_dag);
auto actions_dag = std::make_shared<ActionsDAG>(aggregated_columns);
getRootActions(column_ast, false, *actions_dag);
desc.order_by_actions.push_back(std::move(actions_dag));
}
}
@ -1068,6 +1068,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
rename_dag->getOutputs()[pos] = &alias;
}
}
rename_dag->appendInputsForUnusedColumns(joined_plan->getCurrentDataStream().header);
auto rename_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(rename_dag));
rename_step->setStepDescription("Rename joined columns");
joined_plan->addStep(std::move(rename_step));
@ -1219,7 +1220,7 @@ ActionsAndFlagsPtr SelectQueryExpressionAnalyzer::appendPrewhere(
}
{
ActionsDAG actions;
auto actions = std::make_shared<ActionsAndFlags>();
auto required_columns = prewhere_actions->actions.getRequiredColumns();
NameSet prewhere_input_names;
@ -1265,7 +1266,7 @@ ActionsAndFlagsPtr SelectQueryExpressionAnalyzer::appendPrewhere(
}
}
actions = ActionsDAG(required_columns);
actions->actions = ActionsDAG(required_columns);
}
else
{
@ -1280,7 +1281,7 @@ ActionsAndFlagsPtr SelectQueryExpressionAnalyzer::appendPrewhere(
}
}
actions = ActionsDAG(columns);
actions->actions = ActionsDAG(columns);
}
chain.steps.emplace_back(
@ -1351,8 +1352,8 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
{
for (auto & child : asts)
{
ActionsDAG actions_dag(columns_after_join);
getRootActions(child, only_types, actions_dag);
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(child, only_types, *actions_dag);
group_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes)));
}
@ -1730,9 +1731,8 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const
step.addRequiredOutput(expr->getColumnName());
}
ActionsAndFlagsPtr ExpressionAnalyzer::getActionsDAG(bool add_aliases, bool project_result)
ActionsDAGPtr ExpressionAnalyzer::getActionsDAG(bool add_aliases, bool remove_unused_result)
{
ActionsAndFlagsPtr res;
ActionsDAG actions_dag(aggregated_columns);
NamesWithAliases result_columns;
Names result_names;
@ -1759,16 +1759,15 @@ ActionsAndFlagsPtr ExpressionAnalyzer::getActionsDAG(bool add_aliases, bool proj
if (add_aliases)
{
if (project_result)
if (remove_unused_result)
{
actions_dag.project(result_columns);
res->project_input = res->projected_output = true;
}
else
actions_dag.addAliases(result_columns);
}
if (!(add_aliases && project_result))
if (!(add_aliases && remove_unused_result))
{
NameSet name_set(result_names.begin(), result_names.end());
/// We will not delete the original columns.
@ -1784,14 +1783,13 @@ ActionsAndFlagsPtr ExpressionAnalyzer::getActionsDAG(bool add_aliases, bool proj
actions_dag.removeUnusedActions(name_set);
}
res->actions = std::move(actions_dag);
return res;
return std::make_unique<ActionsDAG>(std::move(actions_dag));
}
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result, CompileExpressions compile_expressions)
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool remove_unused_result, CompileExpressions compile_expressions)
{
return std::make_shared<ExpressionActions>(
getActionsDAG(add_aliases, project_result), ExpressionActionsSettings::fromContext(getContext(), compile_expressions));
getActionsDAG(add_aliases, remove_unused_result), ExpressionActionsSettings::fromContext(getContext(), compile_expressions), remove_unused_result);
}
ActionsDAGPtr ExpressionAnalyzer::getConstActionsDAG(const ColumnsWithTypeAndName & constant_inputs)
@ -1925,7 +1923,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
{
/// Prewhere is always the first one.
prewhere_step_num = 0;
prewhere_info = std::make_shared<PrewhereInfo>(actions, query.prewhere()->getColumnName());
auto dag = std::make_shared<ActionsDAG>(std::move(actions->actions));
prewhere_info = std::make_shared<PrewhereInfo>(std::move(dag), query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{
@ -1967,7 +1966,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (sanitizeBlock(before_where_sample))
{
ExpressionActions(
std::make_shared<ActionsDAG>(before_where->actions.clone()),
before_where->actions.clone(),
ExpressionActionsSettings::fromSettings(context->getSettingsRef())).execute(before_where_sample);
auto & column_elem

View File

@ -115,10 +115,10 @@ public:
/// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression.
/// If add_aliases, only the calculated values in the desired order and add aliases.
/// If also project_result, than only aliases remain in the output block.
/// If also remove_unused_result, than only aliases remain in the output block.
/// Otherwise, only temporary columns will be deleted from the block.
ActionsAndFlagsPtr getActionsDAG(bool add_aliases, bool project_result);
ExpressionActionsPtr getActions(bool add_aliases, bool project_result = true, CompileExpressions compile_expressions = CompileExpressions::no);
ActionsDAGPtr getActionsDAG(bool add_aliases, bool remove_unused_result = true);
ExpressionActionsPtr getActions(bool add_aliases, bool remove_unused_result = true, CompileExpressions compile_expressions = CompileExpressions::no);
/// Get actions to evaluate a constant expression. The function adds constants and applies functions that depend only on constants.
/// Does not execute subqueries.

View File

@ -386,6 +386,8 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
auto adding_missing_defaults_actions = std::make_shared<ExpressionActions>(adding_missing_defaults_dag);
std::cerr << adding_missing_defaults_actions->dumpActions() << std::endl;
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out.addSource(std::make_shared<ConvertingTransform>(query_sample_block, adding_missing_defaults_actions));

View File

@ -124,6 +124,7 @@ namespace ErrorCodes
extern const int UNKNOWN_IDENTIFIER;
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
}
/// Assumes `storage` is set and the table filter (row-level security) is not empty.
@ -175,11 +176,10 @@ FilterDAGInfoPtr generateFilterActions(
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = TreeRewriter(context).analyzeSelect(query_ast, TreeRewriterResult({}, storage, storage_snapshot));
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, context, metadata_snapshot, {}, false, {}, prepared_sets);
filter_info->actions = analyzer.simpleSelectActions();
filter_info->actions = std::make_unique<ActionsDAG>(std::move(analyzer.simpleSelectActions()->actions));
filter_info->column_name = expr_list->children.at(0)->getColumnName();
filter_info->actions->removeUnusedActions(NameSet{filter_info->column_name});
filter_info->actions->projectInput(false);
for (const auto * node : filter_info->actions->getInputs())
filter_info->actions->getOutputs().push_back(node);
@ -1078,15 +1078,15 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
// with this code. See
// https://github.com/ClickHouse/ClickHouse/issues/19857 for details.
if (analysis_result.before_window)
return analysis_result.before_window->getResultColumns();
return analysis_result.before_window->actions.getResultColumns();
// NOTE: should not handle before_limit_by specially since
// WithMergeableState does not process LIMIT BY
return analysis_result.before_order_by->getResultColumns();
return analysis_result.before_order_by->actions.getResultColumns();
}
Block header = analysis_result.before_aggregation->getResultColumns();
Block header = analysis_result.before_aggregation->actions.getResultColumns();
Block res;
@ -1124,18 +1124,18 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
// It's different from selected_columns, see the comment above for
// WithMergeableState stage.
if (analysis_result.before_window)
return analysis_result.before_window->getResultColumns();
return analysis_result.before_window->actions.getResultColumns();
// In case of query on remote shards executed up to
// WithMergeableStateAfterAggregation*, they can process LIMIT BY,
// since the initiator will not apply LIMIT BY again.
if (analysis_result.before_limit_by)
return analysis_result.before_limit_by->getResultColumns();
return analysis_result.before_limit_by->actions.getResultColumns();
return analysis_result.before_order_by->getResultColumns();
return analysis_result.before_order_by->actions.getResultColumns();
}
return analysis_result.final_projection->getResultColumns();
return analysis_result.final_projection->actions.getResultColumns();
}
@ -1636,12 +1636,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
add_filter_step(parallel_replicas_custom_filter_info, "Parallel replica custom key filter");
if (expressions.before_array_join)
{
QueryPlanStepPtr before_array_join_step
= std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expressions.before_array_join);
before_array_join_step->setStepDescription("Before ARRAY JOIN");
query_plan.addStep(std::move(before_array_join_step));
}
executeExpression(query_plan, expressions.before_array_join, "Before ARRAY JOIN");
if (expressions.array_join)
{
@ -1653,23 +1648,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
}
if (expressions.before_join)
{
QueryPlanStepPtr before_join_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.before_join);
before_join_step->setStepDescription("Before JOIN");
query_plan.addStep(std::move(before_join_step));
}
executeExpression(query_plan, expressions.before_join, "Before JOIN");
/// Optional step to convert key columns to common supertype.
if (expressions.converting_join_columns)
{
QueryPlanStepPtr convert_join_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.converting_join_columns);
convert_join_step->setStepDescription("Convert JOIN columns");
query_plan.addStep(std::move(convert_join_step));
}
executeExpression(query_plan, expressions.converting_join_columns, "Convert JOIN columns");
if (expressions.hasJoin())
{
@ -2113,7 +2096,6 @@ void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysis
{
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
analysis.prewhere_info = std::make_shared<PrewhereInfo>(analysis.filter_info->actions, analysis.filter_info->column_name);
analysis.prewhere_info->prewhere_actions->projectInput(false);
analysis.prewhere_info->remove_prewhere_column = analysis.filter_info->do_remove_column;
analysis.prewhere_info->need_filter = true;
analysis.filter_info = nullptr;
@ -2124,7 +2106,6 @@ void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysis
/// Add row level security actions to prewhere.
analysis.prewhere_info->row_level_filter = analysis.filter_info->actions;
analysis.prewhere_info->row_level_column_name = analysis.filter_info->column_name;
analysis.prewhere_info->row_level_filter->projectInput(false);
analysis.filter_info = nullptr;
}
}
@ -2333,7 +2314,7 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 max_paralle
}
if (analysis_result.hasWhere())
{
filter_nodes.push_back(&analysis_result.before_where->findInOutputs(analysis_result.where_column_name));
filter_nodes.push_back(&analysis_result.before_where->actions.findInOutputs(analysis_result.where_column_name));
}
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes);
@ -2442,7 +2423,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
auto column = ColumnAggregateFunction::create(func);
column->insertFrom(place);
Block header = analysis_result.before_aggregation->getResultColumns();
Block header = analysis_result.before_aggregation->actions.getResultColumns();
size_t arguments_size = desc.argument_names.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
@ -2604,7 +2585,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Possible filters: row-security, additional filter, replica filter (before array join), where (after array join)
query_info.has_filters_and_no_array_join_before_filter = row_policy_filter || additional_filter_info
|| parallel_replicas_custom_filter_info
|| (analysis_result.hasWhere() && !analysis_result.before_where->hasArrayJoin() && !analysis_result.array_join);
|| (analysis_result.hasWhere() && !analysis_result.before_where->actions.hasArrayJoin() && !analysis_result.array_join);
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
if (context->hasQueryContext() && !options.is_internal)
@ -2646,10 +2627,14 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
}
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter)
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool remove_filter)
{
auto dag = expression->actions.clone();
if (expression->project_input)
dag->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(), expression, getSelectQuery().where()->getColumnName(), remove_filter);
query_plan.getCurrentDataStream(), std::move(dag), getSelectQuery().where()->getColumnName(), remove_filter);
where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
@ -2723,11 +2708,9 @@ static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryE
return result;
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
auto expression_before_aggregation = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
executeExpression(query_plan, expression, "Before GROUP BY");
AggregateDescriptions aggregates = query_analyzer->aggregates();
const Settings & settings = context->getSettingsRef();
@ -2818,10 +2801,14 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
}
void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter)
void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool remove_filter)
{
auto dag = expression->actions.clone();
if (expression->project_input)
dag->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
auto having_step
= std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), expression, getSelectQuery().having()->getColumnName(), remove_filter);
= std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), std::move(dag), getSelectQuery().having()->getColumnName(), remove_filter);
having_step->setStepDescription("HAVING");
query_plan.addStep(std::move(having_step));
@ -2829,15 +2816,19 @@ void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const Actions
void InterpreterSelectQuery::executeTotalsAndHaving(
QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool remove_filter, bool overflow_row, bool final)
QueryPlan & query_plan, bool has_having, const ActionsAndFlagsPtr & expression, bool remove_filter, bool overflow_row, bool final)
{
auto dag = expression->actions.clone();
if (expression->project_input)
dag->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
const Settings & settings = context->getSettingsRef();
auto totals_having_step = std::make_unique<TotalsHavingStep>(
query_plan.getCurrentDataStream(),
query_analyzer->aggregates(),
overflow_row,
expression,
std::move(dag),
has_having ? getSelectQuery().having()->getColumnName() : "",
remove_filter,
settings.totals_mode,
@ -2870,12 +2861,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
query_plan.addStep(std::move(step));
}
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description)
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, const std::string & description)
{
if (!expression)
return;
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
auto dag = expression->actions.clone();
if (expression->project_input)
dag->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(dag));
expression_step->setStepDescription(description);
query_plan.addStep(std::move(expression_step));
@ -3045,11 +3040,9 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const st
}
void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const ActionsDAGPtr & expression)
void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression)
{
auto projection_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
projection_step->setStepDescription("Projection");
query_plan.addStep(std::move(projection_step));
executeExpression(query_plan, expression, "Projection");
}

View File

@ -174,13 +174,13 @@ private:
/// Different stages of query execution.
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
void executeWhere(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool remove_filter);
void executeAggregation(
QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final, bool has_grouping_sets);
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool remove_filter, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
static void executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description);
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ActionsAndFlagsPtr & expression, bool remove_filter, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, bool remove_filter);
static void executeExpression(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression, const std::string & description);
/// FIXME should go through ActionsDAG to behave as a proper function
void executeWindow(QueryPlan & query_plan);
void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info);
@ -191,7 +191,7 @@ private:
void executeLimitBy(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);
static void executeProjection(QueryPlan & query_plan, const ActionsDAGPtr & expression);
static void executeProjection(QueryPlan & query_plan, const ActionsAndFlagsPtr & expression);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPlan & query_plan);
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan);

View File

@ -1137,9 +1137,9 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
for (const auto & kv : stage.column_to_updated)
{
auto column_name = kv.second->getColumnName();
const auto & dag_node = actions->findInOutputs(column_name);
const auto & alias = actions->addAlias(dag_node, kv.first);
actions->addOrReplaceInOutputs(alias);
const auto & dag_node = actions->actions.findInOutputs(column_name);
const auto & alias = actions->actions.addAlias(dag_node, kv.first);
actions->actions.addOrReplaceInOutputs(alias);
}
}
@ -1202,7 +1202,7 @@ void MutationsInterpreter::Source::read(
{
ActionsDAG::NodeRawConstPtrs nodes(num_filters);
for (size_t i = 0; i < num_filters; ++i)
nodes[i] = &steps[i]->actions()->findInOutputs(names[i]);
nodes[i] = &steps[i]->actions()->actions.findInOutputs(names[i]);
filter = ActionsDAG::buildFilterActionsDAG(nodes);
}
@ -1273,18 +1273,24 @@ QueryPipelineBuilder MutationsInterpreter::addStreamsForLaterStages(const std::v
for (size_t i = 0; i < stage.expressions_chain.steps.size(); ++i)
{
const auto & step = stage.expressions_chain.steps[i];
if (step->actions()->hasArrayJoin())
if (step->actions()->actions.hasArrayJoin())
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, "arrayJoin is not allowed in mutations");
if (i < stage.filter_column_names.size())
{
auto dag = step->actions()->actions.clone();
if (step->actions()->project_input)
dag->appendInputsForUnusedColumns(plan.getCurrentDataStream().header);
/// Execute DELETEs.
plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), step->actions(), stage.filter_column_names[i], false));
plan.addStep(std::make_unique<FilterStep>(plan.getCurrentDataStream(), dag, stage.filter_column_names[i], false));
}
else
{
auto dag = step->actions()->actions.clone();
if (step->actions()->project_input)
dag->appendInputsForUnusedColumns(plan.getCurrentDataStream().header);
/// Execute UPDATE or final projection.
plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), step->actions()));
plan.addStep(std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), dag));
}
}

View File

@ -93,8 +93,8 @@ struct WindowDescription
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
std::vector<ActionsDAG> partition_by_actions;
std::vector<ActionsDAG> order_by_actions;
std::vector<ActionsDAGPtr> partition_by_actions;
std::vector<ActionsDAGPtr> order_by_actions;
WindowFrame frame;

View File

@ -11,7 +11,7 @@
namespace DB
{
ActionsChainStep::ActionsChainStep(ActionsDAGPtr actions_,
ActionsChainStep::ActionsChainStep(ActionsAndFlagsPtr actions_,
bool use_actions_nodes_as_output_columns_,
ColumnsWithTypeAndName additional_output_columns_)
: actions(std::move(actions_))
@ -28,12 +28,12 @@ void ActionsChainStep::finalizeInputAndOutputColumns(const NameSet & child_input
auto child_input_columns_copy = child_input_columns;
std::unordered_set<std::string_view> output_nodes_names;
output_nodes_names.reserve(actions->getOutputs().size());
output_nodes_names.reserve(actions->actions.getOutputs().size());
for (auto & output_node : actions->getOutputs())
for (auto & output_node : actions->actions.getOutputs())
output_nodes_names.insert(output_node->result_name);
for (const auto & node : actions->getNodes())
for (const auto & node : actions->actions.getNodes())
{
auto it = child_input_columns_copy.find(node.result_name);
if (it == child_input_columns_copy.end())
@ -45,20 +45,20 @@ void ActionsChainStep::finalizeInputAndOutputColumns(const NameSet & child_input
if (output_nodes_names.contains(node.result_name))
continue;
actions->getOutputs().push_back(&node);
actions->actions.getOutputs().push_back(&node);
output_nodes_names.insert(node.result_name);
}
actions->removeUnusedActions();
actions->actions.removeUnusedActions();
/// TODO: Analyzer fix ActionsDAG input and constant nodes with same name
actions->projectInput();
actions->project_input = true;
initialize();
}
void ActionsChainStep::dump(WriteBuffer & buffer) const
{
buffer << "DAG" << '\n';
buffer << actions->dumpDAG();
buffer << actions->actions.dumpDAG();
if (!available_output_columns.empty())
{
@ -84,7 +84,7 @@ String ActionsChainStep::dump() const
void ActionsChainStep::initialize()
{
auto required_columns_names = actions->getRequiredColumnsNames();
auto required_columns_names = actions->actions.getRequiredColumnsNames();
input_columns_names = NameSet(required_columns_names.begin(), required_columns_names.end());
available_output_columns.clear();
@ -93,7 +93,7 @@ void ActionsChainStep::initialize()
{
std::unordered_set<std::string_view> available_output_columns_names;
for (const auto & node : actions->getNodes())
for (const auto & node : actions->actions.getNodes())
{
if (available_output_columns_names.contains(node.result_name))
continue;

View File

@ -48,18 +48,18 @@ public:
* If use_actions_nodes_as_output_columns = true output columns are initialized using actions dag nodes.
* If additional output columns are specified they are added to output columns.
*/
explicit ActionsChainStep(ActionsDAGPtr actions_,
explicit ActionsChainStep(ActionsAndFlagsPtr actions_,
bool use_actions_nodes_as_output_columns = true,
ColumnsWithTypeAndName additional_output_columns_ = {});
/// Get actions
ActionsDAGPtr & getActions()
ActionsAndFlagsPtr & getActions()
{
return actions;
}
/// Get actions
const ActionsDAGPtr & getActions() const
const ActionsAndFlagsPtr & getActions() const
{
return actions;
}
@ -98,7 +98,7 @@ public:
private:
void initialize();
ActionsDAGPtr actions;
ActionsAndFlagsPtr actions;
bool use_actions_nodes_as_output_columns = true;

View File

@ -90,7 +90,7 @@ public:
ActionsDAGPtr alias_column_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false);
auto outputs = actions_visitor.visit(alias_column_actions_dag, column_node->getExpression());
auto outputs = actions_visitor.visit(*alias_column_actions_dag, column_node->getExpression());
if (outputs.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected single output in actions dag for alias column {}. Actual {}", column_node->dumpTree(), outputs.size());
@ -340,7 +340,7 @@ void collectTableExpressionData(QueryTreeNodePtr & query_node, PlannerContextPtr
QueryTreeNodePtr query_tree_node = query_node_typed.getPrewhere();
PlannerActionsVisitor visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = visitor.visit(prewhere_actions_dag, query_tree_node);
auto expression_nodes = visitor.visit(*prewhere_actions_dag, query_tree_node);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::ILLEGAL_PREWHERE,
"Invalid PREWHERE. Expected single boolean expression. In query {}",

View File

@ -99,6 +99,7 @@ namespace ErrorCodes
extern const int TOO_DEEP_SUBQUERIES;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
}
namespace
@ -329,12 +330,16 @@ public:
};
void addExpressionStep(QueryPlan & query_plan,
const ActionsDAGPtr & expression_actions,
const ActionsAndFlagsPtr & expression_actions,
const std::string & step_description,
std::vector<ActionsDAGPtr> & result_actions_to_execute)
{
result_actions_to_execute.push_back(expression_actions);
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression_actions);
auto actions = expression_actions->actions.clone();
if (expression_actions->project_input)
actions->appendInputsForUnusedColumns( query_plan.getCurrentDataStream().header);
result_actions_to_execute.push_back(actions);
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), actions);
expression_step->setStepDescription(step_description);
query_plan.addStep(std::move(expression_step));
}
@ -344,9 +349,13 @@ void addFilterStep(QueryPlan & query_plan,
const std::string & step_description,
std::vector<ActionsDAGPtr> & result_actions_to_execute)
{
result_actions_to_execute.push_back(filter_analysis_result.filter_actions);
auto actions = filter_analysis_result.filter_actions->actions.clone();
if (filter_analysis_result.filter_actions->project_input)
actions->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
result_actions_to_execute.push_back(actions);
auto where_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
filter_analysis_result.filter_actions,
actions,
filter_analysis_result.filter_column_name,
filter_analysis_result.remove_filter_column);
where_step->setStepDescription(step_description);
@ -545,14 +554,21 @@ void addTotalsHavingStep(QueryPlan & query_plan,
const auto & having_analysis_result = expression_analysis_result.getHaving();
bool need_finalize = !query_node.isGroupByWithRollup() && !query_node.isGroupByWithCube();
ActionsDAGPtr actions;
if (having_analysis_result.filter_actions)
result_actions_to_execute.push_back(having_analysis_result.filter_actions);
{
actions = having_analysis_result.filter_actions->actions.clone();
if (having_analysis_result.filter_actions->project_input)
actions->appendInputsForUnusedColumns(query_plan.getCurrentDataStream().header);
result_actions_to_execute.push_back(actions);
}
auto totals_having_step = std::make_unique<TotalsHavingStep>(
query_plan.getCurrentDataStream(),
aggregation_analysis_result.aggregate_descriptions,
query_analysis_result.aggregate_overflow_row,
having_analysis_result.filter_actions,
actions,
having_analysis_result.filter_column_name,
having_analysis_result.remove_filter_column,
settings.totals_mode,
@ -728,12 +744,12 @@ void addWithFillStepIfNeeded(QueryPlan & query_plan,
auto & interpolate_node_typed = interpolate_node->as<InterpolateNode &>();
PlannerActionsVisitor planner_actions_visitor(planner_context);
auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
auto expression_to_interpolate_expression_nodes = planner_actions_visitor.visit(*interpolate_actions_dag,
interpolate_node_typed.getExpression());
if (expression_to_interpolate_expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression to interpolate expected to have single action node");
auto interpolate_expression_nodes = planner_actions_visitor.visit(interpolate_actions_dag,
auto interpolate_expression_nodes = planner_actions_visitor.visit(*interpolate_actions_dag,
interpolate_node_typed.getInterpolateExpression());
if (interpolate_expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Interpolate expression expected to have single action node");

View File

@ -886,7 +886,7 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
for (const auto & argument : function_node.getArguments())
{
auto index_hint_argument_expression_dag_nodes = actions_visitor.visit(index_hint_actions_dag, argument);
auto index_hint_argument_expression_dag_nodes = actions_visitor.visit(*index_hint_actions_dag, argument);
for (auto & expression_dag_node : index_hint_argument_expression_dag_nodes)
{
@ -1013,10 +1013,13 @@ PlannerActionsVisitor::PlannerActionsVisitor(const PlannerContextPtr & planner_c
, use_column_identifier_as_action_node_name(use_column_identifier_as_action_node_name_)
{}
ActionsDAG::NodeRawConstPtrs PlannerActionsVisitor::visit(ActionsDAGPtr actions_dag, QueryTreeNodePtr expression_node)
ActionsDAG::NodeRawConstPtrs PlannerActionsVisitor::visit(ActionsDAG & actions_dag, QueryTreeNodePtr expression_node)
{
PlannerActionsVisitorImpl actions_visitor_impl(actions_dag, planner_context, use_column_identifier_as_action_node_name);
return actions_visitor_impl.visit(expression_node);
auto ptr = std::make_shared<ActionsDAG>(std::move(actions_dag));
PlannerActionsVisitorImpl actions_visitor_impl(ptr, planner_context, use_column_identifier_as_action_node_name);
auto res = actions_visitor_impl.visit(expression_node);
actions_dag = std::move(*ptr);
return res;
}
String calculateActionNodeName(const QueryTreeNodePtr & node,

View File

@ -37,7 +37,7 @@ public:
* Necessary actions are not added in actions dag output.
* Returns query tree expression node actions dag nodes.
*/
ActionsDAG::NodeRawConstPtrs visit(ActionsDAGPtr actions_dag, QueryTreeNodePtr expression_node);
ActionsDAG::NodeRawConstPtrs visit(ActionsDAG & actions_dag, QueryTreeNodePtr expression_node);
private:
const PlannerContextPtr planner_context;

View File

@ -44,8 +44,9 @@ FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_no
{
FilterAnalysisResult result;
result.filter_actions = buildActionsDAGFromExpressionNode(filter_expression_node, input_columns, planner_context);
result.filter_column_name = result.filter_actions->getOutputs().at(0)->result_name;
result.filter_actions = std::make_shared<ActionsAndFlags>();
result.filter_actions->actions = buildActionsDAGFromExpressionNode(filter_expression_node, input_columns, planner_context);
result.filter_column_name = result.filter_actions->actions.getOutputs().at(0)->result_name;
actions_chain.addStep(std::make_unique<ActionsChainStep>(result.filter_actions));
return result;
@ -111,8 +112,9 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
Names aggregation_keys;
ActionsDAGPtr before_aggregation_actions = std::make_shared<ActionsDAG>(input_columns);
before_aggregation_actions->getOutputs().clear();
ActionsAndFlagsPtr before_aggregation_actions = std::make_shared<ActionsAndFlags>();
before_aggregation_actions->actions = ActionsDAG(input_columns);
before_aggregation_actions->actions.getOutputs().clear();
std::unordered_set<std::string_view> before_aggregation_actions_output_node_names;
@ -147,7 +149,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
if (constant_key && !aggregates_descriptions.empty() && (!check_constants_for_group_by_key || canRemoveConstantFromGroupByKey(*constant_key)))
continue;
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, grouping_set_key_node);
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions->actions, grouping_set_key_node);
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
@ -160,7 +162,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
auto column_after_aggregation = group_by_use_nulls && expression_dag_node->column != nullptr ? makeNullableSafe(expression_dag_node->column) : expression_dag_node->column;
available_columns_after_aggregation.emplace_back(std::move(column_after_aggregation), expression_type_after_aggregation, expression_dag_node->result_name);
aggregation_keys.push_back(expression_dag_node->result_name);
before_aggregation_actions->getOutputs().push_back(expression_dag_node);
before_aggregation_actions->actions.getOutputs().push_back(expression_dag_node);
before_aggregation_actions_output_node_names.insert(expression_dag_node->result_name);
}
}
@ -199,7 +201,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
if (constant_key && !aggregates_descriptions.empty() && (!check_constants_for_group_by_key || canRemoveConstantFromGroupByKey(*constant_key)))
continue;
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, group_by_key_node);
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions->actions, group_by_key_node);
aggregation_keys.reserve(expression_dag_nodes.size());
for (auto & expression_dag_node : expression_dag_nodes)
@ -211,7 +213,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
auto column_after_aggregation = group_by_use_nulls && expression_dag_node->column != nullptr ? makeNullableSafe(expression_dag_node->column) : expression_dag_node->column;
available_columns_after_aggregation.emplace_back(std::move(column_after_aggregation), expression_type_after_aggregation, expression_dag_node->result_name);
aggregation_keys.push_back(expression_dag_node->result_name);
before_aggregation_actions->getOutputs().push_back(expression_dag_node);
before_aggregation_actions->actions.getOutputs().push_back(expression_dag_node);
before_aggregation_actions_output_node_names.insert(expression_dag_node->result_name);
}
}
@ -225,13 +227,13 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
auto & aggregate_function_node_typed = aggregate_function_node->as<FunctionNode &>();
for (const auto & aggregate_function_node_argument : aggregate_function_node_typed.getArguments().getNodes())
{
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, aggregate_function_node_argument);
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions->actions, aggregate_function_node_argument);
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_aggregation_actions_output_node_names.contains(expression_dag_node->result_name))
continue;
before_aggregation_actions->getOutputs().push_back(expression_dag_node);
before_aggregation_actions->actions.getOutputs().push_back(expression_dag_node);
before_aggregation_actions_output_node_names.insert(expression_dag_node->result_name);
}
}
@ -278,8 +280,9 @@ std::optional<WindowAnalysisResult> analyzeWindow(const QueryTreeNodePtr & query
PlannerActionsVisitor actions_visitor(planner_context);
ActionsDAGPtr before_window_actions = std::make_shared<ActionsDAG>(input_columns);
before_window_actions->getOutputs().clear();
ActionsAndFlagsPtr before_window_actions = std::make_shared<ActionsAndFlags>();
before_window_actions->actions = ActionsDAG(input_columns);
before_window_actions->actions.getOutputs().clear();
std::unordered_set<std::string_view> before_window_actions_output_node_names;
@ -288,25 +291,25 @@ std::optional<WindowAnalysisResult> analyzeWindow(const QueryTreeNodePtr & query
auto & window_function_node_typed = window_function_node->as<FunctionNode &>();
auto & window_node = window_function_node_typed.getWindowNode()->as<WindowNode &>();
auto expression_dag_nodes = actions_visitor.visit(before_window_actions, window_function_node_typed.getArgumentsNode());
auto expression_dag_nodes = actions_visitor.visit(before_window_actions->actions, window_function_node_typed.getArgumentsNode());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_output_node_names.contains(expression_dag_node->result_name))
continue;
before_window_actions->getOutputs().push_back(expression_dag_node);
before_window_actions->actions.getOutputs().push_back(expression_dag_node);
before_window_actions_output_node_names.insert(expression_dag_node->result_name);
}
expression_dag_nodes = actions_visitor.visit(before_window_actions, window_node.getPartitionByNode());
expression_dag_nodes = actions_visitor.visit(before_window_actions->actions, window_node.getPartitionByNode());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_output_node_names.contains(expression_dag_node->result_name))
continue;
before_window_actions->getOutputs().push_back(expression_dag_node);
before_window_actions->actions.getOutputs().push_back(expression_dag_node);
before_window_actions_output_node_names.insert(expression_dag_node->result_name);
}
@ -317,14 +320,14 @@ std::optional<WindowAnalysisResult> analyzeWindow(const QueryTreeNodePtr & query
for (auto & sort_node : order_by_node_list.getNodes())
{
auto & sort_node_typed = sort_node->as<SortNode &>();
expression_dag_nodes = actions_visitor.visit(before_window_actions, sort_node_typed.getExpression());
expression_dag_nodes = actions_visitor.visit(before_window_actions->actions, sort_node_typed.getExpression());
for (auto & expression_dag_node : expression_dag_nodes)
{
if (before_window_actions_output_node_names.contains(expression_dag_node->result_name))
continue;
before_window_actions->getOutputs().push_back(expression_dag_node);
before_window_actions->actions.getOutputs().push_back(expression_dag_node);
before_window_actions_output_node_names.insert(expression_dag_node->result_name);
}
}
@ -357,7 +360,8 @@ ProjectionAnalysisResult analyzeProjection(const QueryNode & query_node,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
{
auto projection_actions = buildActionsDAGFromExpressionNode(query_node.getProjectionNode(), input_columns, planner_context);
auto projection_actions = std::make_shared<ActionsAndFlags>();
projection_actions->actions = buildActionsDAGFromExpressionNode(query_node.getProjectionNode(), input_columns, planner_context);
auto projection_columns = query_node.getProjectionColumns();
size_t projection_columns_size = projection_columns.size();
@ -366,7 +370,7 @@ ProjectionAnalysisResult analyzeProjection(const QueryNode & query_node,
NamesWithAliases projection_column_names_with_display_aliases;
projection_column_names_with_display_aliases.reserve(projection_columns_size);
auto & projection_actions_outputs = projection_actions->getOutputs();
auto & projection_actions_outputs = projection_actions->actions.getOutputs();
size_t projection_outputs_size = projection_actions_outputs.size();
if (projection_columns_size != projection_outputs_size)
@ -404,8 +408,9 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
const PlannerContextPtr & planner_context,
ActionsChain & actions_chain)
{
ActionsDAGPtr before_sort_actions = std::make_shared<ActionsDAG>(input_columns);
auto & before_sort_actions_outputs = before_sort_actions->getOutputs();
auto before_sort_actions = std::make_shared<ActionsAndFlags>();
before_sort_actions->actions = ActionsDAG(input_columns);
auto & before_sort_actions_outputs = before_sort_actions->actions.getOutputs();
before_sort_actions_outputs.clear();
PlannerActionsVisitor actions_visitor(planner_context);
@ -419,7 +424,7 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
for (const auto & sort_node : order_by_node_list.getNodes())
{
auto & sort_node_typed = sort_node->as<SortNode &>();
auto expression_dag_nodes = actions_visitor.visit(before_sort_actions, sort_node_typed.getExpression());
auto expression_dag_nodes = actions_visitor.visit(before_sort_actions->actions, sort_node_typed.getExpression());
has_with_fill |= sort_node_typed.withFill();
for (auto & action_dag_node : expression_dag_nodes)
@ -435,7 +440,7 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
if (has_with_fill)
{
for (auto & output_node : before_sort_actions_outputs)
output_node = &before_sort_actions->materializeNode(*output_node);
output_node = &before_sort_actions->actions.materializeNode(*output_node);
}
/// We add only INPUT columns necessary for INTERPOLATE expression in before ORDER BY actions DAG
@ -444,7 +449,7 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
auto & interpolate_list_node = query_node.getInterpolate()->as<ListNode &>();
PlannerActionsVisitor interpolate_actions_visitor(planner_context);
auto interpolate_actions_dag = std::make_shared<ActionsDAG>();
ActionsDAG interpolate_actions_dag;
for (auto & interpolate_node : interpolate_list_node.getNodes())
{
@ -453,10 +458,10 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
}
std::unordered_map<std::string_view, const ActionsDAG::Node *> before_sort_actions_inputs_name_to_node;
for (const auto & node : before_sort_actions->getInputs())
for (const auto & node : before_sort_actions->actions.getInputs())
before_sort_actions_inputs_name_to_node.emplace(node->result_name, node);
for (const auto & node : interpolate_actions_dag->getNodes())
for (const auto & node : interpolate_actions_dag.getNodes())
{
if (before_sort_actions_dag_output_node_names.contains(node.result_name) ||
node.type != ActionsDAG::ActionType::INPUT)
@ -466,7 +471,7 @@ SortAnalysisResult analyzeSort(const QueryNode & query_node,
if (input_node_it == before_sort_actions_inputs_name_to_node.end())
{
auto input_column = ColumnWithTypeAndName{node.column, node.result_type, node.result_name};
const auto * input_node = &before_sort_actions->addInput(std::move(input_column));
const auto * input_node = &before_sort_actions->actions.addInput(std::move(input_column));
auto [it, _] = before_sort_actions_inputs_name_to_node.emplace(node.result_name, input_node);
input_node_it = it;
}
@ -491,22 +496,23 @@ LimitByAnalysisResult analyzeLimitBy(const QueryNode & query_node,
const NameSet & required_output_nodes_names,
ActionsChain & actions_chain)
{
auto before_limit_by_actions = buildActionsDAGFromExpressionNode(query_node.getLimitByNode(), input_columns, planner_context);
auto before_limit_by_actions = std::make_shared<ActionsAndFlags>();
before_limit_by_actions->actions = buildActionsDAGFromExpressionNode(query_node.getLimitByNode(), input_columns, planner_context);
NameSet limit_by_column_names_set;
Names limit_by_column_names;
limit_by_column_names.reserve(before_limit_by_actions->getOutputs().size());
for (auto & output_node : before_limit_by_actions->getOutputs())
limit_by_column_names.reserve(before_limit_by_actions->actions.getOutputs().size());
for (auto & output_node : before_limit_by_actions->actions.getOutputs())
{
limit_by_column_names_set.insert(output_node->result_name);
limit_by_column_names.push_back(output_node->result_name);
}
for (const auto & node : before_limit_by_actions->getNodes())
for (const auto & node : before_limit_by_actions->actions.getNodes())
{
if (required_output_nodes_names.contains(node.result_name) &&
!limit_by_column_names_set.contains(node.result_name))
before_limit_by_actions->getOutputs().push_back(&node);
before_limit_by_actions->actions.getOutputs().push_back(&node);
}
auto actions_step_before_limit_by = std::make_unique<ActionsChainStep>(before_limit_by_actions);
@ -591,7 +597,7 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
if (sort_analysis_result_optional.has_value() && planner_query_processing_info.isFirstStage() && planner_query_processing_info.getToStage() != QueryProcessingStage::Complete)
{
const auto & before_order_by_actions = sort_analysis_result_optional->before_order_by_actions;
for (const auto & output_node : before_order_by_actions->getOutputs())
for (const auto & output_node : before_order_by_actions->actions.getOutputs())
required_output_nodes_names.insert(output_node->result_name);
}
@ -647,8 +653,10 @@ PlannerExpressionsAnalysisResult buildExpressionAnalysisResult(const QueryTreeNo
}
}
auto project_names_actions = std::make_shared<ActionsDAG>(project_names_input);
project_names_actions->project(projection_analysis_result.projection_column_names_with_display_aliases);
auto project_names_actions = std::make_shared<ActionsAndFlags>();
project_names_actions->actions = ActionsDAG(project_names_input);
project_names_actions->actions.project(projection_analysis_result.projection_column_names_with_display_aliases);
project_names_actions->project_input = true;
actions_chain.addStep(std::make_unique<ActionsChainStep>(project_names_actions));
actions_chain.finalize();

View File

@ -17,22 +17,22 @@ namespace DB
struct ProjectionAnalysisResult
{
ActionsDAGPtr projection_actions;
ActionsAndFlagsPtr projection_actions;
Names projection_column_names;
NamesWithAliases projection_column_names_with_display_aliases;
ActionsDAGPtr project_names_actions;
ActionsAndFlagsPtr project_names_actions;
};
struct FilterAnalysisResult
{
ActionsDAGPtr filter_actions;
ActionsAndFlagsPtr filter_actions;
std::string filter_column_name;
bool remove_filter_column = false;
};
struct AggregationAnalysisResult
{
ActionsDAGPtr before_aggregation_actions;
ActionsAndFlagsPtr before_aggregation_actions;
Names aggregation_keys;
AggregateDescriptions aggregate_descriptions;
GroupingSetsParamsList grouping_sets_parameters_list;
@ -41,19 +41,19 @@ struct AggregationAnalysisResult
struct WindowAnalysisResult
{
ActionsDAGPtr before_window_actions;
ActionsAndFlagsPtr before_window_actions;
std::vector<WindowDescription> window_descriptions;
};
struct SortAnalysisResult
{
ActionsDAGPtr before_order_by_actions;
ActionsAndFlagsPtr before_order_by_actions;
bool has_with_fill = false;
};
struct LimitByAnalysisResult
{
ActionsDAGPtr before_limit_by_actions;
ActionsAndFlagsPtr before_limit_by_actions;
Names limit_by_column_names;
};

View File

@ -79,6 +79,7 @@ namespace ErrorCodes
extern const int TOO_MANY_COLUMNS;
extern const int UNSUPPORTED_METHOD;
extern const int BAD_ARGUMENTS;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
}
namespace
@ -1072,7 +1073,7 @@ void joinCastPlanColumnsToNullable(QueryPlan & plan_to_add_cast, PlannerContextP
}
}
cast_actions_dag->projectInput();
cast_actions_dag->appendInputsForUnusedColumns( plan_to_add_cast.getCurrentDataStream().header);
auto cast_join_columns_step = std::make_unique<ExpressionStep>(plan_to_add_cast.getCurrentDataStream(), std::move(cast_actions_dag));
cast_join_columns_step->setStepDescription("Cast JOIN columns to Nullable");
plan_to_add_cast.addStep(std::move(cast_join_columns_step));
@ -1118,12 +1119,12 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
join_table_expression,
planner_context);
join_clauses_and_actions.left_join_expressions_actions->projectInput();
join_clauses_and_actions.left_join_expressions_actions->appendInputsForUnusedColumns(left_plan.getCurrentDataStream().header);
auto left_join_expressions_actions_step = std::make_unique<ExpressionStep>(left_plan.getCurrentDataStream(), join_clauses_and_actions.left_join_expressions_actions);
left_join_expressions_actions_step->setStepDescription("JOIN actions");
left_plan.addStep(std::move(left_join_expressions_actions_step));
join_clauses_and_actions.right_join_expressions_actions->projectInput();
join_clauses_and_actions.right_join_expressions_actions->appendInputsForUnusedColumns(right_plan.getCurrentDataStream().header);
auto right_join_expressions_actions_step = std::make_unique<ExpressionStep>(right_plan.getCurrentDataStream(), join_clauses_and_actions.right_join_expressions_actions);
right_join_expressions_actions_step->setStepDescription("JOIN actions");
right_plan.addStep(std::move(right_join_expressions_actions_step));
@ -1175,7 +1176,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
output_node = &cast_actions_dag->addCast(*output_node, cast_type, output_node->result_name);
}
cast_actions_dag->projectInput();
cast_actions_dag->appendInputsForUnusedColumns(plan_to_add_cast.getCurrentDataStream().header);
auto cast_join_columns_step
= std::make_unique<ExpressionStep>(plan_to_add_cast.getCurrentDataStream(), std::move(cast_actions_dag));
cast_join_columns_step->setStepDescription("Cast JOIN USING columns");
@ -1570,7 +1571,7 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
array_join_column_names.insert(array_join_column_identifier);
auto & array_join_expression_column = array_join_expression->as<ColumnNode &>();
auto expression_dag_index_nodes = actions_visitor.visit(array_join_action_dag, array_join_expression_column.getExpressionOrThrow());
auto expression_dag_index_nodes = actions_visitor.visit(*array_join_action_dag, array_join_expression_column.getExpressionOrThrow());
for (auto & expression_dag_index_node : expression_dag_index_nodes)
{
@ -1580,7 +1581,7 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
}
}
array_join_action_dag->projectInput();
array_join_action_dag->appendInputsForUnusedColumns(plan.getCurrentDataStream().header);
join_tree_query_plan.actions_dags.push_back(array_join_action_dag);

View File

@ -183,7 +183,7 @@ const ActionsDAG::Node * appendExpression(
const JoinNode & join_node)
{
PlannerActionsVisitor join_expression_visitor(planner_context);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(dag, expression);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(*dag, expression);
if (join_expression_dag_node_raw_pointers.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"JOIN {} ON clause contains multiple expressions",
@ -603,7 +603,7 @@ JoinClausesAndActions buildJoinClausesAndActions(
{
auto mixed_join_expressions_actions = std::make_shared<ActionsDAG>(mixed_table_expression_columns);
PlannerActionsVisitor join_expression_visitor(planner_context);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(mixed_join_expressions_actions, join_expression);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(*mixed_join_expressions_actions, join_expression);
if (join_expression_dag_node_raw_pointers.size() != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "JOIN {} ON clause contains multiple expressions", join_node.formatASTForErrorMessage());

View File

@ -213,14 +213,14 @@ StorageLimits buildStorageLimits(const Context & context, const SelectQueryOptio
return {limits, leaf_limits};
}
ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
ActionsDAG buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context)
{
ActionsDAGPtr action_dag = std::make_shared<ActionsDAG>(input_columns);
ActionsDAG action_dag(input_columns);
PlannerActionsVisitor actions_visitor(planner_context);
auto expression_dag_index_nodes = actions_visitor.visit(action_dag, expression_node);
action_dag->getOutputs() = std::move(expression_dag_index_nodes);
action_dag.getOutputs() = std::move(expression_dag_index_nodes);
return action_dag;
}
@ -443,7 +443,7 @@ FilterDAGInfo buildFilterInfo(QueryTreeNodePtr filter_query_tree,
auto filter_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
auto expression_nodes = actions_visitor.visit(*filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Filter actions must return single output node. Actual {}",

View File

@ -47,7 +47,7 @@ StorageLimits buildStorageLimits(const Context & context, const SelectQueryOptio
* Inputs are not used for actions dag outputs.
* Only root query tree expression node is used as actions dag output.
*/
ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
ActionsDAG buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
const ColumnsWithTypeAndName & input_columns,
const PlannerContextPtr & planner_context);

View File

@ -176,8 +176,6 @@ static void appendExpression(ActionsDAGPtr & dag, const ActionsDAGPtr & expressi
dag->mergeInplace(std::move(*expression->clone()));
else
dag = expression->clone();
dag->projectInput(false);
}
/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain.

View File

@ -77,7 +77,7 @@ static AggregateProjectionInfo getAggregatingProjectionInfo(
AggregateProjectionInfo info;
info.context = interpreter.getContext();
info.before_aggregation = analysis_result.before_aggregation;
info.before_aggregation = analysis_result.before_aggregation->actions.clone();
info.keys = query_analyzer->aggregationKeys().getNames();
info.aggregates = query_analyzer->aggregates();

View File

@ -850,8 +850,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
static ActionsDAGPtr createProjection(const Block & header)
{
auto projection = std::make_shared<ActionsDAG>(header.getNamesAndTypesList());
projection->removeUnusedActions(header.getNames());
projection->projectInput();
// projection->removeUnusedActions(header.getNames());
return projection;
}
@ -2010,6 +2009,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
Block cur_header = pipe.getHeader();
bool project_inputs = result_projection != nullptr;
auto append_actions = [&result_projection](ActionsDAGPtr actions)
{
if (!result_projection)
@ -2035,6 +2035,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
if (result_projection)
{
if (project_inputs)
result_projection->appendInputsForUnusedColumns(pipe.getHeader());
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{

View File

@ -178,7 +178,7 @@ void AddingDefaultsTransform::transform(Chunk & chunk)
auto dag = evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
auto actions = std::make_shared<ExpressionActions>(std::move(dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes), true);
actions->execute(evaluate_block);
}

View File

@ -7050,7 +7050,7 @@ ActionDAGNodes MergeTreeData::getFiltersForPrimaryKeyAnalysis(const InterpreterS
filter_nodes.nodes.push_back(&additional_filter_info->actions->findInOutputs(additional_filter_info->column_name));
if (before_where)
filter_nodes.nodes.push_back(&before_where->findInOutputs(where_column_name));
filter_nodes.nodes.push_back(&before_where->actions.findInOutputs(where_column_name));
return filter_nodes;
}

View File

@ -273,7 +273,6 @@ void StorageMaterializedView::read(
* They may be added in case of distributed query with JOIN.
* In that case underlying table returns joined columns as well.
*/
converting_actions->projectInput(false);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
converting_step->setStepDescription("Convert target table structure to MaterializedView structure");
query_plan.addStep(std::move(converting_step));

View File

@ -964,7 +964,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
}
PlannerActionsVisitor actions_visitor(modified_query_info.planner_context, false /*use_column_identifier_as_action_node_name*/);
actions_visitor.visit(filter_actions_dag, column_node);
actions_visitor.visit(*filter_actions_dag, column_node);
}
column_names_as_aliases = filter_actions_dag->getRequiredColumnsNames();
if (column_names_as_aliases.empty())
@ -1513,7 +1513,7 @@ void ReadFromMerge::convertAndFilterSourceStream(
query_analysis_pass.run(query_tree, local_context);
PlannerActionsVisitor actions_visitor(modified_query_info.planner_context, false /*use_column_identifier_as_action_node_name*/);
const auto & nodes = actions_visitor.visit(actions_dag, query_tree);
const auto & nodes = actions_visitor.visit(*actions_dag, query_tree);
if (nodes.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected to have 1 output but got {}", nodes.size());