ActionsDAG rename index to outputs

This commit is contained in:
Maksim Kita 2022-08-08 17:54:51 +02:00
parent 4c3153b2ba
commit c030fd05e7
13 changed files with 234 additions and 208 deletions

View File

@ -57,7 +57,7 @@ void ActionsDAG::Node::toTree(JSONBuilder::JSONMap & map) const
ActionsDAG::ActionsDAG(const NamesAndTypesList & inputs_)
{
for (const auto & input : inputs_)
index.push_back(&addInput(input.name, input.type));
outputs.push_back(&addInput(input.name, input.type));
}
ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
@ -74,10 +74,10 @@ ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
/// without any respect to header structure. So, it is a way to drop materialized column and use
/// constant value from header.
/// We cannot remove such input right now cause inputs positions are important in some cases.
index.push_back(&addColumn(input));
outputs.push_back(&addColumn(input));
}
else
index.push_back(&addInput(input.name, input.type));
outputs.push_back(&addInput(input.name, input.type));
}
}
@ -237,42 +237,42 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
return addNode(std::move(node));
}
const ActionsDAG::Node & ActionsDAG::findInIndex(const std::string & name) const
const ActionsDAG::Node & ActionsDAG::findInOutputs(const std::string & name) const
{
if (const auto * node = tryFindInIndex(name))
if (const auto * node = tryFindInOutputs(name))
return *node;
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier: '{}'", name);
}
const ActionsDAG::Node * ActionsDAG::tryFindInIndex(const std::string & name) const
const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name) const
{
for (const auto & node : index)
for (const auto & node : outputs)
if (node->result_name == name)
return node;
return nullptr;
}
void ActionsDAG::addOrReplaceInIndex(const Node & node)
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
{
for (auto & index_node : index)
for (auto & output_node : outputs)
{
if (index_node->result_name == node.result_name)
if (output_node->result_name == node.result_name)
{
index_node = &node;
output_node = &node;
return;
}
}
index.push_back(&node);
outputs.push_back(&node);
}
NamesAndTypesList ActionsDAG::getRequiredColumns() const
{
NamesAndTypesList result;
for (const auto & input : inputs)
result.emplace_back(input->result_name, input->result_type);
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name, input_node->result_type);
return result;
}
@ -282,8 +282,8 @@ Names ActionsDAG::getRequiredColumnsNames() const
Names result;
result.reserve(inputs.size());
for (const auto & input : inputs)
result.emplace_back(input->result_name);
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name);
return result;
}
@ -291,8 +291,9 @@ Names ActionsDAG::getRequiredColumnsNames() const
ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
{
ColumnsWithTypeAndName result;
result.reserve(index.size());
for (const auto & node : index)
result.reserve(outputs.size());
for (const auto & node : outputs)
result.emplace_back(node->column, node->result_type, node->result_name);
return result;
@ -301,7 +302,7 @@ ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
{
NamesAndTypesList result;
for (const auto & node : index)
for (const auto & node : outputs)
result.emplace_back(node->result_name, node->result_type);
return result;
@ -310,8 +311,9 @@ NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
Names ActionsDAG::getNames() const
{
Names names;
names.reserve(index.size());
for (const auto & node : index)
names.reserve(outputs.size());
for (const auto & node : outputs)
names.emplace_back(node->result_name);
return names;
@ -335,7 +337,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
required_nodes.reserve(required_names.size());
NameSet added;
for (const auto & node : index)
for (const auto & node : outputs)
{
if (required_names.contains(node->result_name) && !added.contains(node->result_name))
{
@ -352,7 +354,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
"Unknown column: {}, there are only columns {}", name, dumpNames());
}
index.swap(required_nodes);
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
}
@ -362,7 +364,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
required_nodes.reserve(required_names.size());
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : index)
for (const auto * node : outputs)
names_map[node->result_name] = node;
for (const auto & name : required_names)
@ -375,7 +377,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
required_nodes.push_back(it->second);
}
index.swap(required_nodes);
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
}
@ -384,7 +386,7 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta
std::unordered_set<const Node *> visited_nodes;
std::stack<Node *> stack;
for (const auto * node : index)
for (const auto * node : outputs)
{
visited_nodes.insert(node);
stack.push(const_cast<Node *>(node));
@ -516,7 +518,7 @@ Block ActionsDAG::updateHeader(Block header) const
}
ColumnsWithTypeAndName result_columns;
result_columns.reserve(index.size());
result_columns.reserve(outputs.size());
struct Frame
{
@ -525,12 +527,12 @@ Block ActionsDAG::updateHeader(Block header) const
};
{
for (const auto * output : index)
for (const auto * output_node : outputs)
{
if (!node_to_column.contains(output))
if (!node_to_column.contains(output_node))
{
std::stack<Frame> stack;
stack.push({.node = output});
stack.push({.node = output_node});
while (!stack.empty())
{
@ -567,8 +569,8 @@ Block ActionsDAG::updateHeader(Block header) const
}
}
if (node_to_column[output].column)
result_columns.push_back(node_to_column[output]);
if (node_to_column[output_node].column)
result_columns.push_back(node_to_column[output_node]);
}
}
@ -592,35 +594,35 @@ NameSet ActionsDAG::foldActionsByProjection(
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys)
{
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<std::string_view> visited_index_names;
std::unordered_set<std::string_view> visited_output_nodes_names;
std::stack<Node *> stack;
/// Record all needed index nodes to start folding.
for (const auto & node : index)
/// Record all needed output nodes to start folding.
for (const auto & output_node : outputs)
{
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
if (required_columns.find(output_node->result_name) != required_columns.end() || output_node->result_name == predicate_column_name)
{
visited_nodes.insert(node);
visited_index_names.insert(node->result_name);
stack.push(const_cast<Node *>(node));
visited_nodes.insert(output_node);
visited_output_nodes_names.insert(output_node->result_name);
stack.push(const_cast<Node *>(output_node));
}
}
/// If some required columns are not in any index node, try searching from all projection key
/// If some required columns are not in any output node, try searching from all projection key
/// columns. If still missing, return empty set which means current projection fails to match
/// (missing columns).
if (add_missing_keys)
{
for (const auto & column : required_columns)
{
if (visited_index_names.find(column) == visited_index_names.end())
if (visited_output_nodes_names.find(column) == visited_output_nodes_names.end())
{
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
{
const auto * node = &addInput(*column_with_type_name);
visited_nodes.insert(node);
index.push_back(node);
visited_index_names.insert(column);
outputs.push_back(node);
visited_output_nodes_names.insert(column);
}
else
{
@ -662,20 +664,20 @@ NameSet ActionsDAG::foldActionsByProjection(
/// Clean up unused nodes after folding.
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
std::erase_if(index, [&](const Node * node) { return !visited_index_names.contains(node->result_name); });
std::erase_if(outputs, [&](const Node * node) { return !visited_output_nodes_names.contains(node->result_name); });
nodes.remove_if([&](const Node & node) { return !visited_nodes.contains(&node); });
/// Calculate the required columns after folding.
NameSet next_required_columns;
for (const auto & input : inputs)
next_required_columns.insert(input->result_name);
for (const auto & input_node : inputs)
next_required_columns.insert(input_node->result_name);
return next_required_columns;
}
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
{
::sort(index.begin(), index.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
::sort(outputs.begin(), outputs.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
{
return key_names_pos_map.find(lhs->result_name)->second < key_names_pos_map.find(rhs->result_name)->second;
});
@ -684,17 +686,21 @@ void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<st
void ActionsDAG::addAggregatesViaProjection(const Block & aggregates)
{
for (const auto & aggregate : aggregates)
index.push_back(&addInput(aggregate));
outputs.push_back(&addInput(aggregate));
}
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
{
std::unordered_map<std::string_view, size_t> names_map;
for (size_t i = 0; i < index.size(); ++i)
names_map[index[i]->result_name] = i;
size_t output_nodes_size = outputs.size();
for (size_t i = 0; i < output_nodes_size; ++i)
names_map[outputs[i]->result_name] = i;
size_t aliases_size = aliases.size();
NodeRawConstPtrs required_nodes;
required_nodes.reserve(aliases.size());
required_nodes.reserve(aliases_size);
for (const auto & item : aliases)
{
@ -703,10 +709,10 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
required_nodes.push_back(index[it->second]);
required_nodes.push_back(outputs[it->second]);
}
for (size_t i = 0; i < aliases.size(); ++i)
for (size_t i = 0; i < aliases_size; ++i)
{
const auto & item = aliases[i];
const auto * child = required_nodes[i];
@ -726,22 +732,24 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
auto it = names_map.find(child->result_name);
if (it == names_map.end())
{
names_map[child->result_name] = index.size();
index.push_back(child);
names_map[child->result_name] = outputs.size();
outputs.push_back(child);
}
else
index[it->second] = child;
outputs[it->second] = child;
}
}
void ActionsDAG::project(const NamesWithAliases & projection)
{
std::unordered_map<std::string_view, const Node *> names_map;
for (const auto * node : index)
names_map.emplace(node->result_name, node);
for (const auto * output_node : outputs)
names_map.emplace(output_node->result_name, output_node);
index.clear();
index.reserve(projection.size());
outputs.clear();
size_t projection_size = projection.size();
outputs.reserve(projection_size);
for (const auto & item : projection)
{
@ -750,13 +758,13 @@ void ActionsDAG::project(const NamesWithAliases & projection)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
index.push_back(it->second);
outputs.push_back(it->second);
}
for (size_t i = 0; i < projection.size(); ++i)
for (size_t i = 0; i < projection_size; ++i)
{
const auto & item = projection[i];
auto & child = index[i];
auto & child = outputs[i];
if (!item.second.empty() && item.first != item.second)
{
@ -778,8 +786,8 @@ void ActionsDAG::project(const NamesWithAliases & projection)
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
{
for (const auto * node : index)
if (node->result_name == column_name)
for (const auto * output_node : outputs)
if (output_node->result_name == column_name)
return true;
for (auto it = nodes.rbegin(); it != nodes.rend(); ++it)
@ -787,7 +795,7 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
auto & node = *it;
if (node.result_name == column_name)
{
index.push_back(&node);
outputs.push_back(&node);
return true;
}
}
@ -797,19 +805,19 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
bool ActionsDAG::removeUnusedResult(const std::string & column_name)
{
/// Find column in index and remove.
/// Find column in output nodes and remove.
const Node * col;
{
auto it = index.begin();
for (; it != index.end(); ++it)
auto it = outputs.begin();
for (; it != outputs.end(); ++it)
if ((*it)->result_name == column_name)
break;
if (it == index.end())
if (it == outputs.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found result {} in ActionsDAG\n{}", column_name, dumpDAG());
col = *it;
index.erase(it);
outputs.erase(it);
}
/// Check if column is in input.
@ -827,9 +835,9 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
if (col == child)
return false;
/// Do not remove input if it was mentioned in index several times.
for (const auto * node : index)
if (col == node)
/// Do not remove input if it was mentioned in output nodes several times.
for (const auto * output_node : outputs)
if (col == output_node)
return false;
/// Remove from nodes and inputs.
@ -864,11 +872,11 @@ ActionsDAGPtr ActionsDAG::clone() const
for (auto & child : node.children)
child = copy_map[child];
for (const auto & node : index)
actions->index.push_back(copy_map[node]);
for (const auto & output_node : outputs)
actions->outputs.push_back(copy_map[output_node]);
for (const auto & node : inputs)
actions->inputs.push_back(copy_map[node]);
for (const auto & input_node : inputs)
actions->inputs.push_back(copy_map[input_node]);
return actions;
}
@ -939,8 +947,8 @@ std::string ActionsDAG::dumpDAG() const
out << "\n";
}
out << "Index:";
for (const auto * node : index)
out << "Output nodes:";
for (const auto * node : outputs)
out << ' ' << map[node];
out << '\n';
@ -984,8 +992,8 @@ void ActionsDAG::assertDeterministic() const
void ActionsDAG::addMaterializingOutputActions()
{
for (auto & node : index)
node = &materializeNode(*node);
for (auto & output_node : outputs)
output_node = &materializeNode(*output_node);
}
const ActionsDAG::Node & ActionsDAG::materializeNode(const Node & node)
@ -1023,7 +1031,8 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
std::map<std::string_view, std::list<size_t>> inputs;
if (mode == MatchColumnsMode::Name)
{
for (size_t pos = 0; pos < actions_dag->inputs.size(); ++pos)
size_t input_nodes_size = actions_dag->inputs.size();
for (size_t pos = 0; pos < input_nodes_size; ++pos)
inputs[actions_dag->inputs[pos]->result_name].push_back(pos);
}
@ -1134,7 +1143,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
}
}
actions_dag->index.swap(projection);
actions_dag->outputs.swap(projection);
actions_dag->removeUnusedActions();
actions_dag->projectInput();
@ -1153,7 +1162,7 @@ ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
const auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {});
const auto & alias_node = adding_column_action->addAlias(function_node, std::move(column_name));
adding_column_action->index.push_back(&alias_node);
adding_column_action->outputs.push_back(&alias_node);
return adding_column_action;
}
@ -1165,7 +1174,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
/// Will store merged result in `first`.
/// This map contains nodes which should be removed from `first` index, cause they are used as inputs for `second`.
/// This map contains nodes which should be removed from `first` outputs, cause they are used as inputs for `second`.
/// The second element is the number of removes (cause one node may be repeated several times in result).
std::unordered_map<const Node *, size_t> removed_first_result;
/// Map inputs of `second` to nodes of `first`.
@ -1173,25 +1182,25 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
/// Update inputs list.
{
/// Index may have multiple columns with same name. They also may be used by `second`. Order is important.
/// Outputs may have multiple columns with same name. They also may be used by `second`. Order is important.
std::unordered_map<std::string_view, std::list<const Node *>> first_result;
for (const auto & node : first.index)
first_result[node->result_name].push_back(node);
for (const auto & output_node : first.outputs)
first_result[output_node->result_name].push_back(output_node);
for (const auto & node : second.inputs)
for (const auto & input_node : second.inputs)
{
auto it = first_result.find(node->result_name);
auto it = first_result.find(input_node->result_name);
if (it == first_result.end() || it->second.empty())
{
if (first.project_input)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find column {} in ActionsDAG result", node->result_name);
"Cannot find column {} in ActionsDAG result", input_node->result_name);
first.inputs.push_back(node);
first.inputs.push_back(input_node);
}
else
{
inputs_map[node] = it->second.front();
inputs_map[input_node] = it->second.front();
removed_first_result[it->second.front()] += 1;
it->second.pop_front();
}
@ -1212,35 +1221,35 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
}
}
for (auto & node : second.index)
for (auto & output_node : second.outputs)
{
if (node->type == ActionType::INPUT)
if (output_node->type == ActionType::INPUT)
{
auto it = inputs_map.find(node);
auto it = inputs_map.find(output_node);
if (it != inputs_map.end())
node = it->second;
output_node = it->second;
}
}
/// Update index.
/// Update output nodes.
if (second.project_input)
{
first.index.swap(second.index);
first.outputs.swap(second.outputs);
first.project_input = true;
}
else
{
/// Add not removed result from first actions.
for (const auto * node : first.index)
for (const auto * output_node : first.outputs)
{
auto it = removed_first_result.find(node);
auto it = removed_first_result.find(output_node);
if (it != removed_first_result.end() && it->second > 0)
--it->second;
else
second.index.push_back(node);
second.outputs.push_back(output_node);
}
first.index.swap(second.index);
first.outputs.swap(second.outputs);
}
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
@ -1256,12 +1265,13 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
{
/// Split DAG into two parts.
/// (first_nodes, first_index) is a part which will have split_list in result.
/// (second_nodes, second_index) is a part which will have same index as current actions.
Nodes second_nodes;
/// (first_nodes, first_outputs) is a part which will have split_list in result.
/// (second_nodes, second_outputs) is a part which will have same outputs as current actions.
Nodes first_nodes;
NodeRawConstPtrs second_index;
NodeRawConstPtrs first_index;
NodeRawConstPtrs first_outputs;
Nodes second_nodes;
NodeRawConstPtrs second_outputs;
/// List of nodes from current actions which are not inputs, but will be in second part.
NodeRawConstPtrs new_inputs;
@ -1287,8 +1297,8 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
std::stack<Frame> stack;
std::unordered_map<const Node *, Data> data;
for (const auto & node : index)
data[node].used_in_result = true;
for (const auto & output_node : outputs)
data[output_node].used_in_result = true;
/// DFS. Decide if node is needed by split.
for (const auto & node : nodes)
@ -1422,15 +1432,15 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
}
}
for (const auto * node : index)
second_index.push_back(data[node].to_second);
for (const auto * output_node : outputs)
second_outputs.push_back(data[output_node].to_second);
NodeRawConstPtrs second_inputs;
NodeRawConstPtrs first_inputs;
for (const auto * input : inputs)
for (const auto * input_node : inputs)
{
const auto & cur = data[input];
const auto & cur = data[input_node];
first_inputs.push_back(cur.to_first);
}
@ -1438,17 +1448,17 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_index.push_back(cur.to_first);
first_outputs.push_back(cur.to_first);
}
auto first_actions = std::make_shared<ActionsDAG>();
first_actions->nodes.swap(first_nodes);
first_actions->index.swap(first_index);
first_actions->outputs.swap(first_outputs);
first_actions->inputs.swap(first_inputs);
auto second_actions = std::make_shared<ActionsDAG>();
second_actions->nodes.swap(second_nodes);
second_actions->index.swap(second_index);
second_actions->outputs.swap(second_outputs);
second_actions->inputs.swap(second_inputs);
return {std::move(first_actions), std::move(second_actions)};
@ -1524,11 +1534,13 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
{
std::unordered_set<const Node *> split_nodes;
for (const auto & sort_column : sort_columns)
if (const auto * node = tryFindInIndex(sort_column))
if (const auto * node = tryFindInOutputs(sort_column))
split_nodes.insert(node);
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Sorting column {} wasn't found in the ActionsDAG's index. DAG:\n{}", sort_column, dumpDAG());
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sorting column {} wasn't found in the ActionsDAG's outputs. DAG:\n{}",
sort_column,
dumpDAG());
auto res = split(split_nodes);
res.second->project_input = project_input;
@ -1537,11 +1549,12 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
{
const auto * node = tryFindInIndex(column_name);
const auto * node = tryFindInOutputs(column_name);
if (!node)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name, dumpDAG());
"Outputs for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name,
dumpDAG());
std::unordered_set<const Node *> split_nodes = {node};
auto res = split(split_nodes);
@ -1689,7 +1702,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPt
/// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
///
/// Result actions add single column with conjunction result (it is always first in index).
/// Result actions add single column with conjunction result (it is always first in outputs).
/// No other columns are added or removed.
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs)
{
@ -1763,7 +1776,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
actions->index.push_back(result_predicate);
actions->outputs.push_back(result_predicate);
for (const auto & col : all_inputs)
{
@ -1778,9 +1791,9 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
actions->inputs.push_back(input);
}
/// We should not add result_predicate into the index for the second time.
/// We should not add result_predicate into the outputs for the second time.
if (input->result_name != result_predicate->result_name)
actions->index.push_back(input);
actions->outputs.push_back(input);
}
return actions;
@ -1792,10 +1805,12 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
const Names & available_inputs,
const ColumnsWithTypeAndName & all_inputs)
{
Node * predicate = const_cast<Node *>(tryFindInIndex(filter_name));
Node * predicate = const_cast<Node *>(tryFindInOutputs(filter_name));
if (!predicate)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Index for ActionsDAG does not contain filter column name {}. DAG:\n{}", filter_name, dumpDAG());
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Output nodes for ActionsDAG do not contain filter column name {}. DAG:\n{}",
filter_name,
dumpDAG());
/// If condition is constant let's do nothing.
/// It means there is nothing to push down or optimization was already applied.
@ -1807,8 +1822,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
/// Get input nodes from available_inputs names.
{
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
for (const auto & input : inputs)
inputs_map[input->result_name].emplace_back(input);
for (const auto & input_node : inputs)
inputs_map[input_node->result_name].emplace_back(input_node);
for (const auto & name : available_inputs)
{
@ -1833,8 +1848,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
/// The whole predicate was split.
if (can_remove_filter)
{
/// If filter column is not needed, remove it from index.
std::erase_if(index, [&](const Node * node) { return node == predicate; });
/// If filter column is not needed, remove it from output nodes.
std::erase_if(outputs, [&](const Node * node) { return node == predicate; });
/// At the very end of this method we'll call removeUnusedActions() with allow_remove_inputs=false,
/// so we need to manually remove predicate if it is an input node.
@ -1859,11 +1874,11 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
{
/// Special case. We cannot replace input to constant inplace.
/// Because we cannot affect inputs list for actions.
/// So we just add a new constant and update index.
/// So we just add a new constant and update outputs.
const auto * new_predicate = &addNode(node);
for (auto & index_node : index)
if (index_node == predicate)
index_node = new_predicate;
for (auto & output_node : outputs)
if (output_node == predicate)
output_node = new_predicate;
}
}
}

View File

@ -96,8 +96,8 @@ public:
private:
Nodes nodes;
NodeRawConstPtrs index;
NodeRawConstPtrs inputs;
NodeRawConstPtrs outputs;
bool project_input = false;
bool projected_output = false;
@ -111,7 +111,12 @@ public:
explicit ActionsDAG(const ColumnsWithTypeAndName & inputs_);
const Nodes & getNodes() const { return nodes; }
const NodeRawConstPtrs & getIndex() const { return index; }
const NodeRawConstPtrs & getOutputs() const { return outputs; }
/** Output nodes can contain any column returned from DAG.
* You may manually change it if needed.
*/
NodeRawConstPtrs & getOutputs() { return outputs; }
const NodeRawConstPtrs & getInputs() const { return inputs; }
NamesAndTypesList getRequiredColumns() const;
@ -133,25 +138,26 @@ public:
NodeRawConstPtrs children,
std::string result_name);
/// Index can contain any column returned from DAG.
/// You may manually change it if needed.
NodeRawConstPtrs & getIndex() { return index; }
/// Find first column by name in index. This search is linear.
const Node & findInIndex(const std::string & name) const;
/// Find first column by name in output nodes. This search is linear.
const Node & findInOutputs(const std::string & name) const;
/// Same, but return nullptr if node not found.
const Node * tryFindInIndex(const std::string & name) const;
/// Find first node with the same name in index and replace it.
/// If was not found, add node to index end.
void addOrReplaceInIndex(const Node & node);
const Node * tryFindInOutputs(const std::string & name) const;
/// Find first node with the same name in output nodes and replace it.
/// If was not found, add node to outputs end.
void addOrReplaceInOutputs(const Node & node);
/// Call addAlias several times.
void addAliases(const NamesWithAliases & aliases);
/// Add alias actions and remove unused columns from index. Also specify result columns order in index.
/// Add alias actions and remove unused columns from outputs. Also specify result columns order in outputs.
void project(const NamesWithAliases & projection);
/// If column is not in index, try to find it in nodes and insert back into index.
/// If column is not in outputs, try to find it in nodes and insert back into outputs.
bool tryRestoreColumn(const std::string & column_name);
/// Find column in result. Remove it from index.
/// Find column in result. Remove it from outputs.
/// If columns is in inputs and has no dependent nodes, remove it from inputs too.
/// Return true if column was removed from inputs.
bool removeUnusedResult(const std::string & column_name);
@ -160,7 +166,13 @@ public:
bool isInputProjected() const { return project_input; }
bool isOutputProjected() const { return projected_output; }
/// Remove actions that are not needed to compute output nodes
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Remove actions that are not needed to compute output nodes with required names
void removeUnusedActions(const Names & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Remove actions that are not needed to compute output nodes with required names
void removeUnusedActions(const NameSet & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Transform the current DAG in a way that leaf nodes get folded into their parents. It's done
@ -196,10 +208,10 @@ public:
const String & predicate_column_name = {},
bool add_missing_keys = true);
/// Reorder the index nodes using given position mapping.
/// Reorder the output nodes using given position mapping.
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);
/// Add aggregate columns to index nodes from projection
/// Add aggregate columns to output nodes from projection
void addAggregatesViaProjection(const Block & aggregates);
bool hasArrayJoin() const;
@ -263,7 +275,7 @@ public:
/// Split ActionsDAG into two DAGs, where first part contains all nodes from split_nodes and their children.
/// Execution of first then second parts on block is equivalent to execution of initial DAG.
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal index (outputs).
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal outputs.
/// Second DAG inputs may contain less inputs then first DAG (but also include other columns).
SplitResult split(std::unordered_set<const Node *> split_nodes) const;
@ -271,7 +283,7 @@ public:
SplitResult splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
/// Splits actions into two parts. First part has minimal size sufficient for calculation of column_name.
/// Index of initial actions must contain column_name.
/// Outputs of initial actions must contain column_name.
SplitResult splitActionsForFilter(const std::string & column_name) const;
/// Splits actions into two parts. The first part contains all the calculations required to calculate sort_columns.
@ -304,8 +316,6 @@ public:
private:
Node & addNode(Node node);
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
#if USE_EMBEDDED_COMPILER
void compileFunctions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {});
#endif

View File

@ -389,7 +389,7 @@ SetPtr makeExplicitSet(
const ASTPtr & right_arg = args.children.at(1);
auto column_name = left_arg->getColumnName();
const auto & dag_node = actions.findInIndex(column_name);
const auto & dag_node = actions.findInOutputs(column_name);
const DataTypePtr & left_arg_type = dag_node.result_type;
DataTypes set_element_types = {left_arg_type};
@ -507,7 +507,7 @@ ActionsMatcher::Data::Data(
, actions_stack(std::move(actions_dag), context_)
, aggregation_keys_info(aggregation_keys_info_)
, build_expression_with_window_functions(build_expression_with_window_functions_)
, next_unique_suffix(actions_stack.getLastActions().getIndex().size() + 1)
, next_unique_suffix(actions_stack.getLastActions().getOutputs().size() + 1)
{
}
@ -526,9 +526,9 @@ ScopeStack::ScopeStack(ActionsDAGPtr actions_dag, ContextPtr context_) : WithCon
{
auto & level = stack.emplace_back();
level.actions_dag = std::move(actions_dag);
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
for (const auto & node : level.actions_dag->getIndex())
for (const auto & node : level.actions_dag->getOutputs())
if (node->type == ActionsDAG::ActionType::INPUT)
level.inputs.emplace(node->result_name);
}
@ -537,7 +537,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
{
auto & level = stack.emplace_back();
level.actions_dag = std::make_shared<ActionsDAG>();
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
const auto & prev = stack[stack.size() - 2];
for (const auto & input_column : input_columns)
@ -547,7 +547,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
level.inputs.emplace(input_column.name);
}
for (const auto & node : prev.actions_dag->getIndex())
for (const auto & node : prev.actions_dag->getOutputs())
{
if (!level.index->contains(node->result_name))
{

View File

@ -301,7 +301,7 @@ static std::unordered_set<const ActionsDAG::Node *> processShortCircuitFunctions
if (short_circuit_nodes.empty())
return {};
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getIndex());
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getOutputs());
/// For each node we fill LazyExecutionInfo.
std::unordered_map<const ActionsDAG::Node *, LazyExecutionInfo> lazy_execution_infos;
@ -335,10 +335,10 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
};
const auto & nodes = getNodes();
const auto & index = actions_dag->getIndex();
const auto & outputs = actions_dag->getOutputs();
const auto & inputs = actions_dag->getInputs();
auto reverse_info = getActionsDAGReverseInfo(nodes, index);
auto reverse_info = getActionsDAGReverseInfo(nodes, outputs);
std::vector<Data> data;
for (const auto & node : nodes)
data.push_back({.node = &node});
@ -428,9 +428,9 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
}
}
result_positions.reserve(index.size());
result_positions.reserve(outputs.size());
for (const auto & node : index)
for (const auto & node : outputs)
{
auto pos = data[reverse_info.reverse_index[node]].position;

View File

@ -304,7 +304,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
ssize_t group_size = group_elements_ast.size();
const auto & column_name = group_elements_ast[j]->getColumnName();
const auto * node = temp_actions->tryFindInIndex(column_name);
const auto * node = temp_actions->tryFindInOutputs(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
@ -358,7 +358,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
getRootActionsNoMakeSet(group_asts[i], temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto * node = temp_actions->tryFindInIndex(column_name);
const auto * node = temp_actions->tryFindInOutputs(column_name);
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
@ -524,7 +524,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
auto temp_actions = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(left_in_operand, true, temp_actions);
if (temp_actions->tryFindInIndex(left_in_operand->getColumnName()))
if (temp_actions->tryFindInOutputs(left_in_operand->getColumnName()))
makeExplicitSet(func, *temp_actions, true, getContext(), settings.size_limits_for_set, prepared_sets);
}
}
@ -634,7 +634,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
for (size_t i = 0; i < arguments.size(); ++i)
{
const std::string & name = arguments[i]->getColumnName();
const auto * dag_node = actions->tryFindInIndex(name);
const auto * dag_node = actions->tryFindInOutputs(name);
if (!dag_node)
{
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
@ -841,7 +841,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
for (size_t i = 0; i < arguments.size(); ++i)
{
const std::string & name = arguments[i]->getColumnName();
const auto * node = actions->tryFindInIndex(name);
const auto * node = actions->tryFindInOutputs(name);
if (!node)
{
@ -937,8 +937,8 @@ ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAGPtr
/// Assign new names to columns, if needed.
if (result_source.first != result_source.second)
{
const auto & node = actions->findInIndex(result_source.second);
actions->getIndex().push_back(&actions->addAlias(node, result_source.first));
const auto & node = actions->findInOutputs(result_source.second);
actions->getOutputs().push_back(&actions->addAlias(node, result_source.first));
}
/// Make ARRAY JOIN (replace arrays with their insides) for the columns in these new names.
@ -1097,7 +1097,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
{
auto pos = original_right_columns.getPositionByName(name_with_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
rename_dag->getIndex()[pos] = &alias;
rename_dag->getOutputs()[pos] = &alias;
}
}
@ -1212,7 +1212,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
String prewhere_column_name = select_query->prewhere()->getColumnName();
step.addRequiredOutput(prewhere_column_name);
const auto & node = step.actions()->findInIndex(prewhere_column_name);
const auto & node = step.actions()->findInOutputs(prewhere_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
@ -1295,7 +1295,7 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
auto where_column_name = select_query->where()->getColumnName();
step.addRequiredOutput(where_column_name);
const auto & node = step.actions()->findInIndex(where_column_name);
const auto & node = step.actions()->findInOutputs(where_column_name);
auto filter_type = node.result_type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(),

View File

@ -551,10 +551,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression, const
node_to_data[child].all_parents_compilable &= node_is_valid_for_compilation;
}
for (const auto & node : index)
for (const auto & output_node : outputs)
{
/// Force result nodes to compile
node_to_data[node].all_parents_compilable = false;
/// Force output nodes to compile
node_to_data[output_node].all_parents_compilable = false;
}
std::vector<Node *> nodes_to_compile;

View File

@ -106,14 +106,14 @@ static ActionsDAGPtr makeAdditionalPostFilter(ASTPtr & ast, ContextPtr context,
auto syntax_result = TreeRewriter(context).analyze(ast, header.getNamesAndTypesList());
String result_column_name = ast->getColumnName();
auto dag = ExpressionAnalyzer(ast, syntax_result, context).getActionsDAG(false, false);
const ActionsDAG::Node * result_node = &dag->findInIndex(result_column_name);
auto & index = dag->getIndex();
index.clear();
index.reserve(dag->getInputs().size() + 1);
const ActionsDAG::Node * result_node = &dag->findInOutputs(result_column_name);
auto & outputs = dag->getOutputs();
outputs.clear();
outputs.reserve(dag->getInputs().size() + 1);
for (const auto * node : dag->getInputs())
index.push_back(node);
outputs.push_back(node);
index.push_back(result_node);
outputs.push_back(result_node);
return dag;
}
@ -128,7 +128,7 @@ void IInterpreterUnionOrSelectQuery::addAdditionalPostFilter(QueryPlan & plan) c
return;
auto dag = makeAdditionalPostFilter(ast, context, plan.getCurrentDataStream().header);
std::string filter_name = dag->getIndex().back()->result_name;
std::string filter_name = dag->getOutputs().back()->result_name;
auto filter_step = std::make_unique<FilterStep>(
plan.getCurrentDataStream(), std::move(dag), std::move(filter_name), true);
filter_step->setStepDescription("Additional result filter");

View File

@ -162,7 +162,7 @@ FilterDAGInfoPtr generateFilterActions(
filter_info->actions->projectInput(false);
for (const auto * node : filter_info->actions->getInputs())
filter_info->actions->getIndex().push_back(node);
filter_info->actions->getOutputs().push_back(node);
auto required_columns_from_filter = filter_info->actions->getRequiredColumns();

View File

@ -859,9 +859,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
for (const auto & kv : stage.column_to_updated)
{
auto column_name = kv.second->getColumnName();
const auto & dag_node = actions->findInIndex(column_name);
const auto & dag_node = actions->findInOutputs(column_name);
const auto & alias = actions->addAlias(dag_node, kv.first);
actions->addOrReplaceInIndex(alias);
actions->addOrReplaceInOutputs(alias);
}
}

View File

@ -23,7 +23,7 @@ ActionsDAGPtr addMissingDefaults(
bool null_as_default)
{
auto actions = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto & index = actions->getIndex();
auto & index = actions->getOutputs();
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.

View File

@ -239,15 +239,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs index;
index.reserve(output_header.columns() + 1);
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.push_back(grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
@ -264,19 +264,19 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag->materializeNode(*node);
index.push_back(node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)];
const auto * column_node = dag->getOutputs()[header.getPositionByName(col.name)];
if (group_by_use_nulls && column_node->result_type->canBeInsideNullable())
index.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
outputs.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
else
index.push_back(column_node);
outputs.push_back(column_node);
}
}
dag->getIndex().swap(index);
dag->getOutputs().swap(outputs);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -40,17 +40,17 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
{
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto & index = dag->getIndex();
auto & outputs = dag->getOutputs();
if (use_nulls)
{
auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr);
for (const auto & key : keys)
{
const auto * node = dag->getIndex()[header.getPositionByName(key)];
const auto * node = dag->getOutputs()[header.getPositionByName(key)];
if (node->result_type->canBeInsideNullable())
{
dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name));
dag->addOrReplaceInOutputs(dag->addFunction(to_nullable, { node }, node->result_name));
}
}
}
@ -60,7 +60,7 @@ ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, b
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
index.insert(index.begin(), grouping_node);
outputs.insert(outputs.begin(), grouping_node);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
return std::make_shared<ExpressionTransform>(header, expression);

View File

@ -84,7 +84,7 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
const auto & expression = filter->getExpression();
const auto & filter_column_name = filter->getFilterColumnName();
const auto * filter_node = expression->tryFindInIndex(filter_column_name);
const auto * filter_node = expression->tryFindInOutputs(filter_column_name);
if (!filter_node && !filter->removesFilterColumn())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
@ -102,7 +102,8 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.
String split_filter_column_name = split_filter->getIndex().front()->result_name;
String split_filter_column_name = split_filter->getOutputs().front()->result_name;
node.step = std::make_unique<FilterStep>(
node.children.at(0)->step->getOutputStream(), split_filter, std::move(split_filter_column_name), can_remove_filter);