Remove index by name from ActionsDAG

This commit is contained in:
Nikolai Kochetov 2021-03-02 20:51:54 +03:00
parent ffaf9da2db
commit 4775ea305e
9 changed files with 42 additions and 43 deletions

View File

@ -14,7 +14,7 @@ AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream(
: output(output_), header(header_)
{
auto dag = addMissingDefaults(header_, output->getHeader().getNamesAndTypesList(), columns_, context_);
adding_defaults_actions = std::make_shared<ExpressionActions>(std::move(dag));
adding_defaults_actions = std::make_shared<ExpressionActions>(std::move(dag), context_);
}
void AddingDefaultBlockOutputStream::write(const Block & block)

View File

@ -174,7 +174,7 @@ Block AddingDefaultsBlockInputStream::readImpl()
auto dag = evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
auto actions = std::make_shared<ExpressionActions>(std::move(dag), context);
actions->execute(evaluate_block);
}

View File

@ -312,7 +312,6 @@ void ActionsDAG::removeUnusedActions(const Names & required_names)
nodes_list.pop_back();
}
removeUnusedActions(required_nodes);
index.swap(required_nodes);
removeUnusedActions();
}
@ -358,11 +357,11 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
node->children.clear();
}
for (auto * child : node->children)
for (const auto * child : node->children)
{
if (visited_nodes.count(child) == 0)
{
stack.push(child);
stack.push(const_cast<Node *>(child));
visited_nodes.insert(child);
}
}
@ -386,7 +385,7 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases, bool project)
auto & nodes_list = names_map[item.first];
if (nodes_list.empty())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", name, dumpNames());
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
const auto * child = nodes_list.front();
nodes_list.pop_front();
@ -506,8 +505,8 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
ActionsDAGPtr ActionsDAG::clone() const
{
auto actions = std::make_shared<ActionsDAG>();
actions.project_input = project_input;
actions.projected_output = projected_output;
actions->project_input = project_input;
actions->projected_output = projected_output;
std::unordered_map<const Node *, Node *> copy_map;
@ -530,15 +529,10 @@ ActionsDAGPtr ActionsDAG::clone() const
return actions;
}
void ActionsDAG::compileExpressions()
void ActionsDAG::compileExpressions(std::shared_ptr<CompiledExpressionCache> cache)
{
#if USE_EMBEDDED_COMPILER
if (settings.compile_expressions)
{
compileFunctions();
removeUnusedActions();
}
#endif
compileFunctions(cache);
removeUnusedActions();
}
std::string ActionsDAG::dumpDAG() const
@ -769,7 +763,7 @@ ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {});
auto & alias_node = adding_column_action->addAlias(function_node, std::move(column_name));
adding_column_action.index->push_back(&alias_node);
adding_column_action->index.push_back(&alias_node);
return adding_column_action;
}
@ -799,7 +793,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
auto it = first_result.find(node->result_name);
if (it == first_result.end() || it->second.empty())
{
if (first.settings.project_input)
if (first.project_input)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find column {} in ActionsDAG result", node->result_name);
@ -815,9 +809,9 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
}
/// Replace inputs from `second` to nodes from `first` result.
for (const auto & node : second.nodes)
for (auto & node : second.nodes)
{
for (const auto & child : node.children)
for (auto & child : node.children)
{
if (child->type == ActionType::INPUT)
{
@ -828,7 +822,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
}
}
for (const auto & node : second.index)
for (auto & node : second.index)
{
if (node->type == ActionType::INPUT)
{
@ -861,7 +855,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
first.settings.projected_output = second.settings.projected_output;
first.projected_output = second.projected_output;
/// Drop unused inputs and, probably, some actions.
first.removeUnusedActions();
@ -1159,8 +1153,8 @@ namespace
struct ConjunctionNodes
{
NodeRawConstPtrs allowed;
NodeRawConstPtrs rejected;
ActionsDAG::NodeRawConstPtrs allowed;
ActionsDAG::NodeRawConstPtrs rejected;
};
/// Take a node which result is predicate.
@ -1170,8 +1164,8 @@ struct ConjunctionNodes
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
{
ConjunctionNodes conjunction;
std::unordered_set<ActionsDAG::Node *> allowed;
std::unordered_set<ActionsDAG::Node *> rejected;
std::unordered_set<const ActionsDAG::Node *> allowed;
std::unordered_set<const ActionsDAG::Node *> rejected;
struct Frame
{
@ -1249,7 +1243,7 @@ ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordere
return conjunction;
}
ColumnsWithTypeAndName prepareFunctionArguments(const NodeRawConstPtrs & nodes)
ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPtrs & nodes)
{
ColumnsWithTypeAndName arguments;
arguments.reserve(nodes.size());
@ -1343,15 +1337,15 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
if (conjunction.size() > 1)
{
std::vector<Node *> args;
NodeRawConstPtrs args;
args.reserve(conjunction.size());
for (const auto * predicate : conjunction)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, args, {});
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
actions->index.insert(result_predicate);
actions->index.push_back(result_predicate);
return actions;
}

View File

@ -26,7 +26,6 @@ using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
class Context;
class CompiledExpressionCache;
/// Directed acyclic graph of expressions.
@ -227,6 +226,7 @@ public:
bool removeUnusedResult(const std::string & column_name);
void projectInput() { project_input = true; }
bool projectedOutput() const { return projected_output; }
void removeUnusedActions(const Names & required_names);
bool hasArrayJoin() const;
@ -235,7 +235,7 @@ public:
//const ActionsSettings & getSettings() const { return settings; }
void compileExpressions();
void compileExpressions(std::shared_ptr<CompiledExpressionCache> cache);
ActionsDAGPtr clone() const;
@ -317,9 +317,9 @@ private:
void removeUnusedActions(bool allow_remove_inputs = true);
void addAliases(const NamesWithAliases & aliases, bool project);
void compileFunctions();
void compileFunctions(std::shared_ptr<CompiledExpressionCache> cache);
ActionsDAGPtr cloneActionsForConjunction(std::vector<Node *> conjunction);
ActionsDAGPtr cloneActionsForConjunction(NodeRawConstPtrs conjunction);
};

View File

@ -44,15 +44,18 @@ namespace ErrorCodes
ExpressionActions::~ExpressionActions() = default;
ExpressionActions::ExpressionActions(ActionsDAGPtr actions_dag_)
ExpressionActions::ExpressionActions(ActionsDAGPtr actions_dag_, const Context & context)
{
actions_dag = actions_dag_->clone();
actions_dag->compileExpressions();
const auto & settings = context.getSettingsRef();
if (settings.compile_expressions)
actions_dag->compileExpressions(context.getCompiledExpressionCache());
linearizeActions();
const auto & settings = actions_dag->getSettings();
max_temporary_non_const_columns = settings.max_temporary_non_const_columns;
if (settings.max_temporary_columns && num_columns > settings.max_temporary_columns)
throw Exception(ErrorCodes::TOO_MANY_TEMPORARY_COLUMNS,

View File

@ -29,6 +29,7 @@ using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Context;
/// Sequence of actions on the block.
/// Is used to calculate expressions.
@ -38,7 +39,6 @@ class ExpressionActions
{
public:
using Node = ActionsDAG::Node;
using Index = ActionsDAG::Index;
struct Argument
{
@ -78,10 +78,12 @@ private:
ColumnNumbers result_positions;
Block sample_block;
size_t max_temporary_non_const_columns = 0;
public:
ExpressionActions() = delete;
~ExpressionActions();
explicit ExpressionActions(ActionsDAGPtr actions_dag_);
explicit ExpressionActions(ActionsDAGPtr actions_dag_, const Context & context);
ExpressionActions(const ExpressionActions &) = default;
ExpressionActions & operator=(const ExpressionActions &) = default;
@ -184,7 +186,7 @@ struct ExpressionActionsChain
void finalize(const Names & required_output_) override
{
if (!actions_dag->getSettings().projected_output)
if (!actions_dag->projectedOutput())
actions_dag->removeUnusedActions(required_output_);
}

View File

@ -190,7 +190,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
auto actions = std::make_shared<ExpressionActions>(std::move(dag), storage.global_context);
actions->execute(additional_columns);
}

View File

@ -442,7 +442,7 @@ void MergeTreeData::checkPartitionKeyAndInitMinMax(const KeyDescription & new_pa
/// Add all columns used in the partition key to the min-max index.
const NamesAndTypesList & minmax_idx_columns_with_types = new_partition_key.expression->getRequiredColumnsWithTypes();
minmax_idx_expr = std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(minmax_idx_columns_with_types));
minmax_idx_expr = std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(minmax_idx_columns_with_types), global_context);
for (const NameAndTypePair & column : minmax_idx_columns_with_types)
{
minmax_idx_columns.emplace_back(column.name);

View File

@ -94,7 +94,7 @@ public:
pipe.getHeader().getColumnsWithTypeAndName(),
to_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag, context);
pipe.addSimpleTransform([&](const Block & header)
{