mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #39998 from kitaisreal/actions-dag-refactoring
ActionsDAG rename index to outputs
This commit is contained in:
commit
aff8149f5c
@ -57,7 +57,7 @@ void ActionsDAG::Node::toTree(JSONBuilder::JSONMap & map) const
|
||||
ActionsDAG::ActionsDAG(const NamesAndTypesList & inputs_)
|
||||
{
|
||||
for (const auto & input : inputs_)
|
||||
index.push_back(&addInput(input.name, input.type));
|
||||
outputs.push_back(&addInput(input.name, input.type));
|
||||
}
|
||||
|
||||
ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
|
||||
@ -74,10 +74,10 @@ ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
|
||||
/// without any respect to header structure. So, it is a way to drop materialized column and use
|
||||
/// constant value from header.
|
||||
/// We cannot remove such input right now cause inputs positions are important in some cases.
|
||||
index.push_back(&addColumn(input));
|
||||
outputs.push_back(&addColumn(input));
|
||||
}
|
||||
else
|
||||
index.push_back(&addInput(input.name, input.type));
|
||||
outputs.push_back(&addInput(input.name, input.type));
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,42 +237,42 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
|
||||
return addNode(std::move(node));
|
||||
}
|
||||
|
||||
const ActionsDAG::Node & ActionsDAG::findInIndex(const std::string & name) const
|
||||
const ActionsDAG::Node & ActionsDAG::findInOutputs(const std::string & name) const
|
||||
{
|
||||
if (const auto * node = tryFindInIndex(name))
|
||||
if (const auto * node = tryFindInOutputs(name))
|
||||
return *node;
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier: '{}'", name);
|
||||
}
|
||||
|
||||
const ActionsDAG::Node * ActionsDAG::tryFindInIndex(const std::string & name) const
|
||||
const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name) const
|
||||
{
|
||||
for (const auto & node : index)
|
||||
for (const auto & node : outputs)
|
||||
if (node->result_name == name)
|
||||
return node;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void ActionsDAG::addOrReplaceInIndex(const Node & node)
|
||||
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
|
||||
{
|
||||
for (auto & index_node : index)
|
||||
for (auto & output_node : outputs)
|
||||
{
|
||||
if (index_node->result_name == node.result_name)
|
||||
if (output_node->result_name == node.result_name)
|
||||
{
|
||||
index_node = &node;
|
||||
output_node = &node;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
index.push_back(&node);
|
||||
outputs.push_back(&node);
|
||||
}
|
||||
|
||||
NamesAndTypesList ActionsDAG::getRequiredColumns() const
|
||||
{
|
||||
NamesAndTypesList result;
|
||||
for (const auto & input : inputs)
|
||||
result.emplace_back(input->result_name, input->result_type);
|
||||
for (const auto & input_node : inputs)
|
||||
result.emplace_back(input_node->result_name, input_node->result_type);
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -282,8 +282,8 @@ Names ActionsDAG::getRequiredColumnsNames() const
|
||||
Names result;
|
||||
result.reserve(inputs.size());
|
||||
|
||||
for (const auto & input : inputs)
|
||||
result.emplace_back(input->result_name);
|
||||
for (const auto & input_node : inputs)
|
||||
result.emplace_back(input_node->result_name);
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -291,8 +291,9 @@ Names ActionsDAG::getRequiredColumnsNames() const
|
||||
ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
|
||||
{
|
||||
ColumnsWithTypeAndName result;
|
||||
result.reserve(index.size());
|
||||
for (const auto & node : index)
|
||||
result.reserve(outputs.size());
|
||||
|
||||
for (const auto & node : outputs)
|
||||
result.emplace_back(node->column, node->result_type, node->result_name);
|
||||
|
||||
return result;
|
||||
@ -301,7 +302,7 @@ ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
|
||||
NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
|
||||
{
|
||||
NamesAndTypesList result;
|
||||
for (const auto & node : index)
|
||||
for (const auto & node : outputs)
|
||||
result.emplace_back(node->result_name, node->result_type);
|
||||
|
||||
return result;
|
||||
@ -310,8 +311,9 @@ NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
|
||||
Names ActionsDAG::getNames() const
|
||||
{
|
||||
Names names;
|
||||
names.reserve(index.size());
|
||||
for (const auto & node : index)
|
||||
names.reserve(outputs.size());
|
||||
|
||||
for (const auto & node : outputs)
|
||||
names.emplace_back(node->result_name);
|
||||
|
||||
return names;
|
||||
@ -335,7 +337,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
|
||||
required_nodes.reserve(required_names.size());
|
||||
|
||||
NameSet added;
|
||||
for (const auto & node : index)
|
||||
for (const auto & node : outputs)
|
||||
{
|
||||
if (required_names.contains(node->result_name) && !added.contains(node->result_name))
|
||||
{
|
||||
@ -352,7 +354,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
|
||||
"Unknown column: {}, there are only columns {}", name, dumpNames());
|
||||
}
|
||||
|
||||
index.swap(required_nodes);
|
||||
outputs.swap(required_nodes);
|
||||
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
|
||||
}
|
||||
|
||||
@ -362,7 +364,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
|
||||
required_nodes.reserve(required_names.size());
|
||||
|
||||
std::unordered_map<std::string_view, const Node *> names_map;
|
||||
for (const auto * node : index)
|
||||
for (const auto * node : outputs)
|
||||
names_map[node->result_name] = node;
|
||||
|
||||
for (const auto & name : required_names)
|
||||
@ -375,7 +377,7 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
|
||||
required_nodes.push_back(it->second);
|
||||
}
|
||||
|
||||
index.swap(required_nodes);
|
||||
outputs.swap(required_nodes);
|
||||
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
|
||||
}
|
||||
|
||||
@ -384,7 +386,7 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta
|
||||
std::unordered_set<const Node *> visited_nodes;
|
||||
std::stack<Node *> stack;
|
||||
|
||||
for (const auto * node : index)
|
||||
for (const auto * node : outputs)
|
||||
{
|
||||
visited_nodes.insert(node);
|
||||
stack.push(const_cast<Node *>(node));
|
||||
@ -516,7 +518,7 @@ Block ActionsDAG::updateHeader(Block header) const
|
||||
}
|
||||
|
||||
ColumnsWithTypeAndName result_columns;
|
||||
result_columns.reserve(index.size());
|
||||
result_columns.reserve(outputs.size());
|
||||
|
||||
struct Frame
|
||||
{
|
||||
@ -525,12 +527,12 @@ Block ActionsDAG::updateHeader(Block header) const
|
||||
};
|
||||
|
||||
{
|
||||
for (const auto * output : index)
|
||||
for (const auto * output_node : outputs)
|
||||
{
|
||||
if (!node_to_column.contains(output))
|
||||
if (!node_to_column.contains(output_node))
|
||||
{
|
||||
std::stack<Frame> stack;
|
||||
stack.push({.node = output});
|
||||
stack.push({.node = output_node});
|
||||
|
||||
while (!stack.empty())
|
||||
{
|
||||
@ -567,8 +569,8 @@ Block ActionsDAG::updateHeader(Block header) const
|
||||
}
|
||||
}
|
||||
|
||||
if (node_to_column[output].column)
|
||||
result_columns.push_back(node_to_column[output]);
|
||||
if (node_to_column[output_node].column)
|
||||
result_columns.push_back(node_to_column[output_node]);
|
||||
}
|
||||
}
|
||||
|
||||
@ -592,35 +594,35 @@ NameSet ActionsDAG::foldActionsByProjection(
|
||||
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys)
|
||||
{
|
||||
std::unordered_set<const Node *> visited_nodes;
|
||||
std::unordered_set<std::string_view> visited_index_names;
|
||||
std::unordered_set<std::string_view> visited_output_nodes_names;
|
||||
std::stack<Node *> stack;
|
||||
|
||||
/// Record all needed index nodes to start folding.
|
||||
for (const auto & node : index)
|
||||
/// Record all needed output nodes to start folding.
|
||||
for (const auto & output_node : outputs)
|
||||
{
|
||||
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
|
||||
if (required_columns.find(output_node->result_name) != required_columns.end() || output_node->result_name == predicate_column_name)
|
||||
{
|
||||
visited_nodes.insert(node);
|
||||
visited_index_names.insert(node->result_name);
|
||||
stack.push(const_cast<Node *>(node));
|
||||
visited_nodes.insert(output_node);
|
||||
visited_output_nodes_names.insert(output_node->result_name);
|
||||
stack.push(const_cast<Node *>(output_node));
|
||||
}
|
||||
}
|
||||
|
||||
/// If some required columns are not in any index node, try searching from all projection key
|
||||
/// If some required columns are not in any output node, try searching from all projection key
|
||||
/// columns. If still missing, return empty set which means current projection fails to match
|
||||
/// (missing columns).
|
||||
if (add_missing_keys)
|
||||
{
|
||||
for (const auto & column : required_columns)
|
||||
{
|
||||
if (visited_index_names.find(column) == visited_index_names.end())
|
||||
if (visited_output_nodes_names.find(column) == visited_output_nodes_names.end())
|
||||
{
|
||||
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
|
||||
{
|
||||
const auto * node = &addInput(*column_with_type_name);
|
||||
visited_nodes.insert(node);
|
||||
index.push_back(node);
|
||||
visited_index_names.insert(column);
|
||||
outputs.push_back(node);
|
||||
visited_output_nodes_names.insert(column);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -662,20 +664,20 @@ NameSet ActionsDAG::foldActionsByProjection(
|
||||
|
||||
/// Clean up unused nodes after folding.
|
||||
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
|
||||
std::erase_if(index, [&](const Node * node) { return !visited_index_names.contains(node->result_name); });
|
||||
std::erase_if(outputs, [&](const Node * node) { return !visited_output_nodes_names.contains(node->result_name); });
|
||||
nodes.remove_if([&](const Node & node) { return !visited_nodes.contains(&node); });
|
||||
|
||||
/// Calculate the required columns after folding.
|
||||
NameSet next_required_columns;
|
||||
for (const auto & input : inputs)
|
||||
next_required_columns.insert(input->result_name);
|
||||
for (const auto & input_node : inputs)
|
||||
next_required_columns.insert(input_node->result_name);
|
||||
|
||||
return next_required_columns;
|
||||
}
|
||||
|
||||
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
|
||||
{
|
||||
::sort(index.begin(), index.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
|
||||
::sort(outputs.begin(), outputs.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
|
||||
{
|
||||
return key_names_pos_map.find(lhs->result_name)->second < key_names_pos_map.find(rhs->result_name)->second;
|
||||
});
|
||||
@ -684,17 +686,21 @@ void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<st
|
||||
void ActionsDAG::addAggregatesViaProjection(const Block & aggregates)
|
||||
{
|
||||
for (const auto & aggregate : aggregates)
|
||||
index.push_back(&addInput(aggregate));
|
||||
outputs.push_back(&addInput(aggregate));
|
||||
}
|
||||
|
||||
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
|
||||
{
|
||||
std::unordered_map<std::string_view, size_t> names_map;
|
||||
for (size_t i = 0; i < index.size(); ++i)
|
||||
names_map[index[i]->result_name] = i;
|
||||
size_t output_nodes_size = outputs.size();
|
||||
|
||||
for (size_t i = 0; i < output_nodes_size; ++i)
|
||||
names_map[outputs[i]->result_name] = i;
|
||||
|
||||
size_t aliases_size = aliases.size();
|
||||
|
||||
NodeRawConstPtrs required_nodes;
|
||||
required_nodes.reserve(aliases.size());
|
||||
required_nodes.reserve(aliases_size);
|
||||
|
||||
for (const auto & item : aliases)
|
||||
{
|
||||
@ -703,10 +709,10 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
|
||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
|
||||
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
|
||||
|
||||
required_nodes.push_back(index[it->second]);
|
||||
required_nodes.push_back(outputs[it->second]);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < aliases.size(); ++i)
|
||||
for (size_t i = 0; i < aliases_size; ++i)
|
||||
{
|
||||
const auto & item = aliases[i];
|
||||
const auto * child = required_nodes[i];
|
||||
@ -726,22 +732,24 @@ void ActionsDAG::addAliases(const NamesWithAliases & aliases)
|
||||
auto it = names_map.find(child->result_name);
|
||||
if (it == names_map.end())
|
||||
{
|
||||
names_map[child->result_name] = index.size();
|
||||
index.push_back(child);
|
||||
names_map[child->result_name] = outputs.size();
|
||||
outputs.push_back(child);
|
||||
}
|
||||
else
|
||||
index[it->second] = child;
|
||||
outputs[it->second] = child;
|
||||
}
|
||||
}
|
||||
|
||||
void ActionsDAG::project(const NamesWithAliases & projection)
|
||||
{
|
||||
std::unordered_map<std::string_view, const Node *> names_map;
|
||||
for (const auto * node : index)
|
||||
names_map.emplace(node->result_name, node);
|
||||
for (const auto * output_node : outputs)
|
||||
names_map.emplace(output_node->result_name, output_node);
|
||||
|
||||
index.clear();
|
||||
index.reserve(projection.size());
|
||||
outputs.clear();
|
||||
|
||||
size_t projection_size = projection.size();
|
||||
outputs.reserve(projection_size);
|
||||
|
||||
for (const auto & item : projection)
|
||||
{
|
||||
@ -750,13 +758,13 @@ void ActionsDAG::project(const NamesWithAliases & projection)
|
||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
|
||||
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
|
||||
|
||||
index.push_back(it->second);
|
||||
outputs.push_back(it->second);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < projection.size(); ++i)
|
||||
for (size_t i = 0; i < projection_size; ++i)
|
||||
{
|
||||
const auto & item = projection[i];
|
||||
auto & child = index[i];
|
||||
auto & child = outputs[i];
|
||||
|
||||
if (!item.second.empty() && item.first != item.second)
|
||||
{
|
||||
@ -778,8 +786,8 @@ void ActionsDAG::project(const NamesWithAliases & projection)
|
||||
|
||||
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
|
||||
{
|
||||
for (const auto * node : index)
|
||||
if (node->result_name == column_name)
|
||||
for (const auto * output_node : outputs)
|
||||
if (output_node->result_name == column_name)
|
||||
return true;
|
||||
|
||||
for (auto it = nodes.rbegin(); it != nodes.rend(); ++it)
|
||||
@ -787,7 +795,7 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
|
||||
auto & node = *it;
|
||||
if (node.result_name == column_name)
|
||||
{
|
||||
index.push_back(&node);
|
||||
outputs.push_back(&node);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -797,19 +805,19 @@ bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
|
||||
|
||||
bool ActionsDAG::removeUnusedResult(const std::string & column_name)
|
||||
{
|
||||
/// Find column in index and remove.
|
||||
/// Find column in output nodes and remove.
|
||||
const Node * col;
|
||||
{
|
||||
auto it = index.begin();
|
||||
for (; it != index.end(); ++it)
|
||||
auto it = outputs.begin();
|
||||
for (; it != outputs.end(); ++it)
|
||||
if ((*it)->result_name == column_name)
|
||||
break;
|
||||
|
||||
if (it == index.end())
|
||||
if (it == outputs.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found result {} in ActionsDAG\n{}", column_name, dumpDAG());
|
||||
|
||||
col = *it;
|
||||
index.erase(it);
|
||||
outputs.erase(it);
|
||||
}
|
||||
|
||||
/// Check if column is in input.
|
||||
@ -827,9 +835,9 @@ bool ActionsDAG::removeUnusedResult(const std::string & column_name)
|
||||
if (col == child)
|
||||
return false;
|
||||
|
||||
/// Do not remove input if it was mentioned in index several times.
|
||||
for (const auto * node : index)
|
||||
if (col == node)
|
||||
/// Do not remove input if it was mentioned in output nodes several times.
|
||||
for (const auto * output_node : outputs)
|
||||
if (col == output_node)
|
||||
return false;
|
||||
|
||||
/// Remove from nodes and inputs.
|
||||
@ -864,11 +872,11 @@ ActionsDAGPtr ActionsDAG::clone() const
|
||||
for (auto & child : node.children)
|
||||
child = copy_map[child];
|
||||
|
||||
for (const auto & node : index)
|
||||
actions->index.push_back(copy_map[node]);
|
||||
for (const auto & output_node : outputs)
|
||||
actions->outputs.push_back(copy_map[output_node]);
|
||||
|
||||
for (const auto & node : inputs)
|
||||
actions->inputs.push_back(copy_map[node]);
|
||||
for (const auto & input_node : inputs)
|
||||
actions->inputs.push_back(copy_map[input_node]);
|
||||
|
||||
return actions;
|
||||
}
|
||||
@ -939,8 +947,8 @@ std::string ActionsDAG::dumpDAG() const
|
||||
out << "\n";
|
||||
}
|
||||
|
||||
out << "Index:";
|
||||
for (const auto * node : index)
|
||||
out << "Output nodes:";
|
||||
for (const auto * node : outputs)
|
||||
out << ' ' << map[node];
|
||||
out << '\n';
|
||||
|
||||
@ -984,8 +992,8 @@ void ActionsDAG::assertDeterministic() const
|
||||
|
||||
void ActionsDAG::addMaterializingOutputActions()
|
||||
{
|
||||
for (auto & node : index)
|
||||
node = &materializeNode(*node);
|
||||
for (auto & output_node : outputs)
|
||||
output_node = &materializeNode(*output_node);
|
||||
}
|
||||
|
||||
const ActionsDAG::Node & ActionsDAG::materializeNode(const Node & node)
|
||||
@ -1023,7 +1031,8 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
||||
std::map<std::string_view, std::list<size_t>> inputs;
|
||||
if (mode == MatchColumnsMode::Name)
|
||||
{
|
||||
for (size_t pos = 0; pos < actions_dag->inputs.size(); ++pos)
|
||||
size_t input_nodes_size = actions_dag->inputs.size();
|
||||
for (size_t pos = 0; pos < input_nodes_size; ++pos)
|
||||
inputs[actions_dag->inputs[pos]->result_name].push_back(pos);
|
||||
}
|
||||
|
||||
@ -1134,7 +1143,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
||||
}
|
||||
}
|
||||
|
||||
actions_dag->index.swap(projection);
|
||||
actions_dag->outputs.swap(projection);
|
||||
actions_dag->removeUnusedActions();
|
||||
actions_dag->projectInput();
|
||||
|
||||
@ -1153,7 +1162,7 @@ ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
|
||||
const auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {});
|
||||
const auto & alias_node = adding_column_action->addAlias(function_node, std::move(column_name));
|
||||
|
||||
adding_column_action->index.push_back(&alias_node);
|
||||
adding_column_action->outputs.push_back(&alias_node);
|
||||
return adding_column_action;
|
||||
}
|
||||
|
||||
@ -1165,7 +1174,7 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
|
||||
|
||||
/// Will store merged result in `first`.
|
||||
|
||||
/// This map contains nodes which should be removed from `first` index, cause they are used as inputs for `second`.
|
||||
/// This map contains nodes which should be removed from `first` outputs, cause they are used as inputs for `second`.
|
||||
/// The second element is the number of removes (cause one node may be repeated several times in result).
|
||||
std::unordered_map<const Node *, size_t> removed_first_result;
|
||||
/// Map inputs of `second` to nodes of `first`.
|
||||
@ -1173,25 +1182,25 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
|
||||
|
||||
/// Update inputs list.
|
||||
{
|
||||
/// Index may have multiple columns with same name. They also may be used by `second`. Order is important.
|
||||
/// Outputs may have multiple columns with same name. They also may be used by `second`. Order is important.
|
||||
std::unordered_map<std::string_view, std::list<const Node *>> first_result;
|
||||
for (const auto & node : first.index)
|
||||
first_result[node->result_name].push_back(node);
|
||||
for (const auto & output_node : first.outputs)
|
||||
first_result[output_node->result_name].push_back(output_node);
|
||||
|
||||
for (const auto & node : second.inputs)
|
||||
for (const auto & input_node : second.inputs)
|
||||
{
|
||||
auto it = first_result.find(node->result_name);
|
||||
auto it = first_result.find(input_node->result_name);
|
||||
if (it == first_result.end() || it->second.empty())
|
||||
{
|
||||
if (first.project_input)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot find column {} in ActionsDAG result", node->result_name);
|
||||
"Cannot find column {} in ActionsDAG result", input_node->result_name);
|
||||
|
||||
first.inputs.push_back(node);
|
||||
first.inputs.push_back(input_node);
|
||||
}
|
||||
else
|
||||
{
|
||||
inputs_map[node] = it->second.front();
|
||||
inputs_map[input_node] = it->second.front();
|
||||
removed_first_result[it->second.front()] += 1;
|
||||
it->second.pop_front();
|
||||
}
|
||||
@ -1212,35 +1221,35 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & node : second.index)
|
||||
for (auto & output_node : second.outputs)
|
||||
{
|
||||
if (node->type == ActionType::INPUT)
|
||||
if (output_node->type == ActionType::INPUT)
|
||||
{
|
||||
auto it = inputs_map.find(node);
|
||||
auto it = inputs_map.find(output_node);
|
||||
if (it != inputs_map.end())
|
||||
node = it->second;
|
||||
output_node = it->second;
|
||||
}
|
||||
}
|
||||
|
||||
/// Update index.
|
||||
/// Update output nodes.
|
||||
if (second.project_input)
|
||||
{
|
||||
first.index.swap(second.index);
|
||||
first.outputs.swap(second.outputs);
|
||||
first.project_input = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Add not removed result from first actions.
|
||||
for (const auto * node : first.index)
|
||||
for (const auto * output_node : first.outputs)
|
||||
{
|
||||
auto it = removed_first_result.find(node);
|
||||
auto it = removed_first_result.find(output_node);
|
||||
if (it != removed_first_result.end() && it->second > 0)
|
||||
--it->second;
|
||||
else
|
||||
second.index.push_back(node);
|
||||
second.outputs.push_back(output_node);
|
||||
}
|
||||
|
||||
first.index.swap(second.index);
|
||||
first.outputs.swap(second.outputs);
|
||||
}
|
||||
|
||||
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
|
||||
@ -1256,12 +1265,13 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
|
||||
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
|
||||
{
|
||||
/// Split DAG into two parts.
|
||||
/// (first_nodes, first_index) is a part which will have split_list in result.
|
||||
/// (second_nodes, second_index) is a part which will have same index as current actions.
|
||||
Nodes second_nodes;
|
||||
/// (first_nodes, first_outputs) is a part which will have split_list in result.
|
||||
/// (second_nodes, second_outputs) is a part which will have same outputs as current actions.
|
||||
Nodes first_nodes;
|
||||
NodeRawConstPtrs second_index;
|
||||
NodeRawConstPtrs first_index;
|
||||
NodeRawConstPtrs first_outputs;
|
||||
|
||||
Nodes second_nodes;
|
||||
NodeRawConstPtrs second_outputs;
|
||||
|
||||
/// List of nodes from current actions which are not inputs, but will be in second part.
|
||||
NodeRawConstPtrs new_inputs;
|
||||
@ -1287,8 +1297,8 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
|
||||
std::stack<Frame> stack;
|
||||
std::unordered_map<const Node *, Data> data;
|
||||
|
||||
for (const auto & node : index)
|
||||
data[node].used_in_result = true;
|
||||
for (const auto & output_node : outputs)
|
||||
data[output_node].used_in_result = true;
|
||||
|
||||
/// DFS. Decide if node is needed by split.
|
||||
for (const auto & node : nodes)
|
||||
@ -1422,15 +1432,15 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto * node : index)
|
||||
second_index.push_back(data[node].to_second);
|
||||
for (const auto * output_node : outputs)
|
||||
second_outputs.push_back(data[output_node].to_second);
|
||||
|
||||
NodeRawConstPtrs second_inputs;
|
||||
NodeRawConstPtrs first_inputs;
|
||||
|
||||
for (const auto * input : inputs)
|
||||
for (const auto * input_node : inputs)
|
||||
{
|
||||
const auto & cur = data[input];
|
||||
const auto & cur = data[input_node];
|
||||
first_inputs.push_back(cur.to_first);
|
||||
}
|
||||
|
||||
@ -1438,17 +1448,17 @@ ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split
|
||||
{
|
||||
const auto & cur = data[input];
|
||||
second_inputs.push_back(cur.to_second);
|
||||
first_index.push_back(cur.to_first);
|
||||
first_outputs.push_back(cur.to_first);
|
||||
}
|
||||
|
||||
auto first_actions = std::make_shared<ActionsDAG>();
|
||||
first_actions->nodes.swap(first_nodes);
|
||||
first_actions->index.swap(first_index);
|
||||
first_actions->outputs.swap(first_outputs);
|
||||
first_actions->inputs.swap(first_inputs);
|
||||
|
||||
auto second_actions = std::make_shared<ActionsDAG>();
|
||||
second_actions->nodes.swap(second_nodes);
|
||||
second_actions->index.swap(second_index);
|
||||
second_actions->outputs.swap(second_outputs);
|
||||
second_actions->inputs.swap(second_inputs);
|
||||
|
||||
return {std::move(first_actions), std::move(second_actions)};
|
||||
@ -1524,11 +1534,13 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
|
||||
{
|
||||
std::unordered_set<const Node *> split_nodes;
|
||||
for (const auto & sort_column : sort_columns)
|
||||
if (const auto * node = tryFindInIndex(sort_column))
|
||||
if (const auto * node = tryFindInOutputs(sort_column))
|
||||
split_nodes.insert(node);
|
||||
else
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Sorting column {} wasn't found in the ActionsDAG's index. DAG:\n{}", sort_column, dumpDAG());
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Sorting column {} wasn't found in the ActionsDAG's outputs. DAG:\n{}",
|
||||
sort_column,
|
||||
dumpDAG());
|
||||
|
||||
auto res = split(split_nodes);
|
||||
res.second->project_input = project_input;
|
||||
@ -1537,11 +1549,12 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameS
|
||||
|
||||
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
|
||||
{
|
||||
const auto * node = tryFindInIndex(column_name);
|
||||
const auto * node = tryFindInOutputs(column_name);
|
||||
if (!node)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
|
||||
column_name, dumpDAG());
|
||||
"Outputs for ActionsDAG does not contain filter column name {}. DAG:\n{}",
|
||||
column_name,
|
||||
dumpDAG());
|
||||
|
||||
std::unordered_set<const Node *> split_nodes = {node};
|
||||
auto res = split(split_nodes);
|
||||
@ -1689,7 +1702,7 @@ ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPt
|
||||
/// Create actions which calculate conjunction of selected nodes.
|
||||
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
|
||||
///
|
||||
/// Result actions add single column with conjunction result (it is always first in index).
|
||||
/// Result actions add single column with conjunction result (it is always first in outputs).
|
||||
/// No other columns are added or removed.
|
||||
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs)
|
||||
{
|
||||
@ -1763,7 +1776,7 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
|
||||
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
|
||||
}
|
||||
|
||||
actions->index.push_back(result_predicate);
|
||||
actions->outputs.push_back(result_predicate);
|
||||
|
||||
for (const auto & col : all_inputs)
|
||||
{
|
||||
@ -1778,9 +1791,9 @@ ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunctio
|
||||
actions->inputs.push_back(input);
|
||||
}
|
||||
|
||||
/// We should not add result_predicate into the index for the second time.
|
||||
/// We should not add result_predicate into the outputs for the second time.
|
||||
if (input->result_name != result_predicate->result_name)
|
||||
actions->index.push_back(input);
|
||||
actions->outputs.push_back(input);
|
||||
}
|
||||
|
||||
return actions;
|
||||
@ -1792,10 +1805,12 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
|
||||
const Names & available_inputs,
|
||||
const ColumnsWithTypeAndName & all_inputs)
|
||||
{
|
||||
Node * predicate = const_cast<Node *>(tryFindInIndex(filter_name));
|
||||
Node * predicate = const_cast<Node *>(tryFindInOutputs(filter_name));
|
||||
if (!predicate)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Index for ActionsDAG does not contain filter column name {}. DAG:\n{}", filter_name, dumpDAG());
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Output nodes for ActionsDAG do not contain filter column name {}. DAG:\n{}",
|
||||
filter_name,
|
||||
dumpDAG());
|
||||
|
||||
/// If condition is constant let's do nothing.
|
||||
/// It means there is nothing to push down or optimization was already applied.
|
||||
@ -1807,8 +1822,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
|
||||
/// Get input nodes from available_inputs names.
|
||||
{
|
||||
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
|
||||
for (const auto & input : inputs)
|
||||
inputs_map[input->result_name].emplace_back(input);
|
||||
for (const auto & input_node : inputs)
|
||||
inputs_map[input_node->result_name].emplace_back(input_node);
|
||||
|
||||
for (const auto & name : available_inputs)
|
||||
{
|
||||
@ -1833,8 +1848,8 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
|
||||
/// The whole predicate was split.
|
||||
if (can_remove_filter)
|
||||
{
|
||||
/// If filter column is not needed, remove it from index.
|
||||
std::erase_if(index, [&](const Node * node) { return node == predicate; });
|
||||
/// If filter column is not needed, remove it from output nodes.
|
||||
std::erase_if(outputs, [&](const Node * node) { return node == predicate; });
|
||||
|
||||
/// At the very end of this method we'll call removeUnusedActions() with allow_remove_inputs=false,
|
||||
/// so we need to manually remove predicate if it is an input node.
|
||||
@ -1859,11 +1874,11 @@ ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
|
||||
{
|
||||
/// Special case. We cannot replace input to constant inplace.
|
||||
/// Because we cannot affect inputs list for actions.
|
||||
/// So we just add a new constant and update index.
|
||||
/// So we just add a new constant and update outputs.
|
||||
const auto * new_predicate = &addNode(node);
|
||||
for (auto & index_node : index)
|
||||
if (index_node == predicate)
|
||||
index_node = new_predicate;
|
||||
for (auto & output_node : outputs)
|
||||
if (output_node == predicate)
|
||||
output_node = new_predicate;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -96,8 +96,8 @@ public:
|
||||
|
||||
private:
|
||||
Nodes nodes;
|
||||
NodeRawConstPtrs index;
|
||||
NodeRawConstPtrs inputs;
|
||||
NodeRawConstPtrs outputs;
|
||||
|
||||
bool project_input = false;
|
||||
bool projected_output = false;
|
||||
@ -111,7 +111,12 @@ public:
|
||||
explicit ActionsDAG(const ColumnsWithTypeAndName & inputs_);
|
||||
|
||||
const Nodes & getNodes() const { return nodes; }
|
||||
const NodeRawConstPtrs & getIndex() const { return index; }
|
||||
const NodeRawConstPtrs & getOutputs() const { return outputs; }
|
||||
/** Output nodes can contain any column returned from DAG.
|
||||
* You may manually change it if needed.
|
||||
*/
|
||||
NodeRawConstPtrs & getOutputs() { return outputs; }
|
||||
|
||||
const NodeRawConstPtrs & getInputs() const { return inputs; }
|
||||
|
||||
NamesAndTypesList getRequiredColumns() const;
|
||||
@ -133,25 +138,26 @@ public:
|
||||
NodeRawConstPtrs children,
|
||||
std::string result_name);
|
||||
|
||||
/// Index can contain any column returned from DAG.
|
||||
/// You may manually change it if needed.
|
||||
NodeRawConstPtrs & getIndex() { return index; }
|
||||
/// Find first column by name in index. This search is linear.
|
||||
const Node & findInIndex(const std::string & name) const;
|
||||
/// Find first column by name in output nodes. This search is linear.
|
||||
const Node & findInOutputs(const std::string & name) const;
|
||||
|
||||
/// Same, but return nullptr if node not found.
|
||||
const Node * tryFindInIndex(const std::string & name) const;
|
||||
/// Find first node with the same name in index and replace it.
|
||||
/// If was not found, add node to index end.
|
||||
void addOrReplaceInIndex(const Node & node);
|
||||
const Node * tryFindInOutputs(const std::string & name) const;
|
||||
|
||||
/// Find first node with the same name in output nodes and replace it.
|
||||
/// If was not found, add node to outputs end.
|
||||
void addOrReplaceInOutputs(const Node & node);
|
||||
|
||||
/// Call addAlias several times.
|
||||
void addAliases(const NamesWithAliases & aliases);
|
||||
/// Add alias actions and remove unused columns from index. Also specify result columns order in index.
|
||||
|
||||
/// Add alias actions and remove unused columns from outputs. Also specify result columns order in outputs.
|
||||
void project(const NamesWithAliases & projection);
|
||||
|
||||
/// If column is not in index, try to find it in nodes and insert back into index.
|
||||
/// If column is not in outputs, try to find it in nodes and insert back into outputs.
|
||||
bool tryRestoreColumn(const std::string & column_name);
|
||||
/// Find column in result. Remove it from index.
|
||||
|
||||
/// Find column in result. Remove it from outputs.
|
||||
/// If columns is in inputs and has no dependent nodes, remove it from inputs too.
|
||||
/// Return true if column was removed from inputs.
|
||||
bool removeUnusedResult(const std::string & column_name);
|
||||
@ -160,7 +166,13 @@ public:
|
||||
bool isInputProjected() const { return project_input; }
|
||||
bool isOutputProjected() const { return projected_output; }
|
||||
|
||||
/// Remove actions that are not needed to compute output nodes
|
||||
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
|
||||
|
||||
/// Remove actions that are not needed to compute output nodes with required names
|
||||
void removeUnusedActions(const Names & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
|
||||
|
||||
/// Remove actions that are not needed to compute output nodes with required names
|
||||
void removeUnusedActions(const NameSet & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
|
||||
|
||||
/// Transform the current DAG in a way that leaf nodes get folded into their parents. It's done
|
||||
@ -196,10 +208,10 @@ public:
|
||||
const String & predicate_column_name = {},
|
||||
bool add_missing_keys = true);
|
||||
|
||||
/// Reorder the index nodes using given position mapping.
|
||||
/// Reorder the output nodes using given position mapping.
|
||||
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);
|
||||
|
||||
/// Add aggregate columns to index nodes from projection
|
||||
/// Add aggregate columns to output nodes from projection
|
||||
void addAggregatesViaProjection(const Block & aggregates);
|
||||
|
||||
bool hasArrayJoin() const;
|
||||
@ -263,7 +275,7 @@ public:
|
||||
|
||||
/// Split ActionsDAG into two DAGs, where first part contains all nodes from split_nodes and their children.
|
||||
/// Execution of first then second parts on block is equivalent to execution of initial DAG.
|
||||
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal index (outputs).
|
||||
/// First DAG and initial DAG have equal inputs, second DAG and initial DAG has equal outputs.
|
||||
/// Second DAG inputs may contain less inputs then first DAG (but also include other columns).
|
||||
SplitResult split(std::unordered_set<const Node *> split_nodes) const;
|
||||
|
||||
@ -271,7 +283,7 @@ public:
|
||||
SplitResult splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const;
|
||||
|
||||
/// Splits actions into two parts. First part has minimal size sufficient for calculation of column_name.
|
||||
/// Index of initial actions must contain column_name.
|
||||
/// Outputs of initial actions must contain column_name.
|
||||
SplitResult splitActionsForFilter(const std::string & column_name) const;
|
||||
|
||||
/// Splits actions into two parts. The first part contains all the calculations required to calculate sort_columns.
|
||||
@ -304,8 +316,6 @@ public:
|
||||
private:
|
||||
Node & addNode(Node node);
|
||||
|
||||
void removeUnusedActions(bool allow_remove_inputs = true, bool allow_constant_folding = true);
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
void compileFunctions(size_t min_count_to_compile_expression, const std::unordered_set<const Node *> & lazy_executed_nodes = {});
|
||||
#endif
|
||||
|
@ -389,7 +389,7 @@ SetPtr makeExplicitSet(
|
||||
const ASTPtr & right_arg = args.children.at(1);
|
||||
|
||||
auto column_name = left_arg->getColumnName();
|
||||
const auto & dag_node = actions.findInIndex(column_name);
|
||||
const auto & dag_node = actions.findInOutputs(column_name);
|
||||
const DataTypePtr & left_arg_type = dag_node.result_type;
|
||||
|
||||
DataTypes set_element_types = {left_arg_type};
|
||||
@ -507,7 +507,7 @@ ActionsMatcher::Data::Data(
|
||||
, actions_stack(std::move(actions_dag), context_)
|
||||
, aggregation_keys_info(aggregation_keys_info_)
|
||||
, build_expression_with_window_functions(build_expression_with_window_functions_)
|
||||
, next_unique_suffix(actions_stack.getLastActions().getIndex().size() + 1)
|
||||
, next_unique_suffix(actions_stack.getLastActions().getOutputs().size() + 1)
|
||||
{
|
||||
}
|
||||
|
||||
@ -526,9 +526,9 @@ ScopeStack::ScopeStack(ActionsDAGPtr actions_dag, ContextPtr context_) : WithCon
|
||||
{
|
||||
auto & level = stack.emplace_back();
|
||||
level.actions_dag = std::move(actions_dag);
|
||||
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
|
||||
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
|
||||
|
||||
for (const auto & node : level.actions_dag->getIndex())
|
||||
for (const auto & node : level.actions_dag->getOutputs())
|
||||
if (node->type == ActionsDAG::ActionType::INPUT)
|
||||
level.inputs.emplace(node->result_name);
|
||||
}
|
||||
@ -537,7 +537,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
|
||||
{
|
||||
auto & level = stack.emplace_back();
|
||||
level.actions_dag = std::make_shared<ActionsDAG>();
|
||||
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getIndex());
|
||||
level.index = std::make_unique<ScopeStack::Index>(level.actions_dag->getOutputs());
|
||||
const auto & prev = stack[stack.size() - 2];
|
||||
|
||||
for (const auto & input_column : input_columns)
|
||||
@ -547,7 +547,7 @@ void ScopeStack::pushLevel(const NamesAndTypesList & input_columns)
|
||||
level.inputs.emplace(input_column.name);
|
||||
}
|
||||
|
||||
for (const auto & node : prev.actions_dag->getIndex())
|
||||
for (const auto & node : prev.actions_dag->getOutputs())
|
||||
{
|
||||
if (!level.index->contains(node->result_name))
|
||||
{
|
||||
|
@ -301,7 +301,7 @@ static std::unordered_set<const ActionsDAG::Node *> processShortCircuitFunctions
|
||||
if (short_circuit_nodes.empty())
|
||||
return {};
|
||||
|
||||
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getIndex());
|
||||
auto reverse_info = getActionsDAGReverseInfo(nodes, actions_dag.getOutputs());
|
||||
|
||||
/// For each node we fill LazyExecutionInfo.
|
||||
std::unordered_map<const ActionsDAG::Node *, LazyExecutionInfo> lazy_execution_infos;
|
||||
@ -335,10 +335,10 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
|
||||
};
|
||||
|
||||
const auto & nodes = getNodes();
|
||||
const auto & index = actions_dag->getIndex();
|
||||
const auto & outputs = actions_dag->getOutputs();
|
||||
const auto & inputs = actions_dag->getInputs();
|
||||
|
||||
auto reverse_info = getActionsDAGReverseInfo(nodes, index);
|
||||
auto reverse_info = getActionsDAGReverseInfo(nodes, outputs);
|
||||
std::vector<Data> data;
|
||||
for (const auto & node : nodes)
|
||||
data.push_back({.node = &node});
|
||||
@ -428,9 +428,9 @@ void ExpressionActions::linearizeActions(const std::unordered_set<const ActionsD
|
||||
}
|
||||
}
|
||||
|
||||
result_positions.reserve(index.size());
|
||||
result_positions.reserve(outputs.size());
|
||||
|
||||
for (const auto & node : index)
|
||||
for (const auto & node : outputs)
|
||||
{
|
||||
auto pos = data[reverse_info.reverse_index[node]].position;
|
||||
|
||||
|
@ -304,7 +304,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
|
||||
|
||||
ssize_t group_size = group_elements_ast.size();
|
||||
const auto & column_name = group_elements_ast[j]->getColumnName();
|
||||
const auto * node = temp_actions->tryFindInIndex(column_name);
|
||||
const auto * node = temp_actions->tryFindInOutputs(column_name);
|
||||
if (!node)
|
||||
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
|
||||
@ -358,7 +358,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
|
||||
getRootActionsNoMakeSet(group_asts[i], temp_actions, false);
|
||||
|
||||
const auto & column_name = group_asts[i]->getColumnName();
|
||||
const auto * node = temp_actions->tryFindInIndex(column_name);
|
||||
const auto * node = temp_actions->tryFindInOutputs(column_name);
|
||||
if (!node)
|
||||
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
|
||||
@ -524,7 +524,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
|
||||
auto temp_actions = std::make_shared<ActionsDAG>(columns_after_join);
|
||||
getRootActions(left_in_operand, true, temp_actions);
|
||||
|
||||
if (temp_actions->tryFindInIndex(left_in_operand->getColumnName()))
|
||||
if (temp_actions->tryFindInOutputs(left_in_operand->getColumnName()))
|
||||
makeExplicitSet(func, *temp_actions, true, getContext(), settings.size_limits_for_set, prepared_sets);
|
||||
}
|
||||
}
|
||||
@ -634,7 +634,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
|
||||
for (size_t i = 0; i < arguments.size(); ++i)
|
||||
{
|
||||
const std::string & name = arguments[i]->getColumnName();
|
||||
const auto * dag_node = actions->tryFindInIndex(name);
|
||||
const auto * dag_node = actions->tryFindInOutputs(name);
|
||||
if (!dag_node)
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
|
||||
@ -841,7 +841,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
|
||||
for (size_t i = 0; i < arguments.size(); ++i)
|
||||
{
|
||||
const std::string & name = arguments[i]->getColumnName();
|
||||
const auto * node = actions->tryFindInIndex(name);
|
||||
const auto * node = actions->tryFindInOutputs(name);
|
||||
|
||||
if (!node)
|
||||
{
|
||||
@ -937,8 +937,8 @@ ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ActionsDAGPtr
|
||||
/// Assign new names to columns, if needed.
|
||||
if (result_source.first != result_source.second)
|
||||
{
|
||||
const auto & node = actions->findInIndex(result_source.second);
|
||||
actions->getIndex().push_back(&actions->addAlias(node, result_source.first));
|
||||
const auto & node = actions->findInOutputs(result_source.second);
|
||||
actions->getOutputs().push_back(&actions->addAlias(node, result_source.first));
|
||||
}
|
||||
|
||||
/// Make ARRAY JOIN (replace arrays with their insides) for the columns in these new names.
|
||||
@ -1097,7 +1097,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
|
||||
{
|
||||
auto pos = original_right_columns.getPositionByName(name_with_alias.first);
|
||||
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
|
||||
rename_dag->getIndex()[pos] = &alias;
|
||||
rename_dag->getOutputs()[pos] = &alias;
|
||||
}
|
||||
}
|
||||
rename_dag->projectInput();
|
||||
@ -1212,7 +1212,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
String prewhere_column_name = select_query->prewhere()->getColumnName();
|
||||
step.addRequiredOutput(prewhere_column_name);
|
||||
|
||||
const auto & node = step.actions()->findInIndex(prewhere_column_name);
|
||||
const auto & node = step.actions()->findInOutputs(prewhere_column_name);
|
||||
auto filter_type = node.result_type;
|
||||
if (!filter_type->canBeUsedInBooleanContext())
|
||||
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
|
||||
@ -1295,7 +1295,7 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
|
||||
auto where_column_name = select_query->where()->getColumnName();
|
||||
step.addRequiredOutput(where_column_name);
|
||||
|
||||
const auto & node = step.actions()->findInIndex(where_column_name);
|
||||
const auto & node = step.actions()->findInOutputs(where_column_name);
|
||||
auto filter_type = node.result_type;
|
||||
if (!filter_type->canBeUsedInBooleanContext())
|
||||
throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(),
|
||||
|
@ -551,10 +551,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression, const
|
||||
node_to_data[child].all_parents_compilable &= node_is_valid_for_compilation;
|
||||
}
|
||||
|
||||
for (const auto & node : index)
|
||||
for (const auto & output_node : outputs)
|
||||
{
|
||||
/// Force result nodes to compile
|
||||
node_to_data[node].all_parents_compilable = false;
|
||||
/// Force output nodes to compile
|
||||
node_to_data[output_node].all_parents_compilable = false;
|
||||
}
|
||||
|
||||
std::vector<Node *> nodes_to_compile;
|
||||
|
@ -106,14 +106,14 @@ static ActionsDAGPtr makeAdditionalPostFilter(ASTPtr & ast, ContextPtr context,
|
||||
auto syntax_result = TreeRewriter(context).analyze(ast, header.getNamesAndTypesList());
|
||||
String result_column_name = ast->getColumnName();
|
||||
auto dag = ExpressionAnalyzer(ast, syntax_result, context).getActionsDAG(false, false);
|
||||
const ActionsDAG::Node * result_node = &dag->findInIndex(result_column_name);
|
||||
auto & index = dag->getIndex();
|
||||
index.clear();
|
||||
index.reserve(dag->getInputs().size() + 1);
|
||||
const ActionsDAG::Node * result_node = &dag->findInOutputs(result_column_name);
|
||||
auto & outputs = dag->getOutputs();
|
||||
outputs.clear();
|
||||
outputs.reserve(dag->getInputs().size() + 1);
|
||||
for (const auto * node : dag->getInputs())
|
||||
index.push_back(node);
|
||||
outputs.push_back(node);
|
||||
|
||||
index.push_back(result_node);
|
||||
outputs.push_back(result_node);
|
||||
|
||||
return dag;
|
||||
}
|
||||
@ -128,7 +128,7 @@ void IInterpreterUnionOrSelectQuery::addAdditionalPostFilter(QueryPlan & plan) c
|
||||
return;
|
||||
|
||||
auto dag = makeAdditionalPostFilter(ast, context, plan.getCurrentDataStream().header);
|
||||
std::string filter_name = dag->getIndex().back()->result_name;
|
||||
std::string filter_name = dag->getOutputs().back()->result_name;
|
||||
auto filter_step = std::make_unique<FilterStep>(
|
||||
plan.getCurrentDataStream(), std::move(dag), std::move(filter_name), true);
|
||||
filter_step->setStepDescription("Additional result filter");
|
||||
|
@ -162,7 +162,7 @@ FilterDAGInfoPtr generateFilterActions(
|
||||
filter_info->actions->projectInput(false);
|
||||
|
||||
for (const auto * node : filter_info->actions->getInputs())
|
||||
filter_info->actions->getIndex().push_back(node);
|
||||
filter_info->actions->getOutputs().push_back(node);
|
||||
|
||||
auto required_columns_from_filter = filter_info->actions->getRequiredColumns();
|
||||
|
||||
|
@ -859,9 +859,9 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
|
||||
for (const auto & kv : stage.column_to_updated)
|
||||
{
|
||||
auto column_name = kv.second->getColumnName();
|
||||
const auto & dag_node = actions->findInIndex(column_name);
|
||||
const auto & dag_node = actions->findInOutputs(column_name);
|
||||
const auto & alias = actions->addAlias(dag_node, kv.first);
|
||||
actions->addOrReplaceInIndex(alias);
|
||||
actions->addOrReplaceInOutputs(alias);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@ ActionsDAGPtr addMissingDefaults(
|
||||
bool null_as_default)
|
||||
{
|
||||
auto actions = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
|
||||
auto & index = actions->getIndex();
|
||||
auto & index = actions->getOutputs();
|
||||
|
||||
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
|
||||
/// First, remember the offset columns for all arrays in the block.
|
||||
|
@ -239,15 +239,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
|
||||
|
||||
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
|
||||
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
|
||||
ActionsDAG::NodeRawConstPtrs index;
|
||||
index.reserve(output_header.columns() + 1);
|
||||
ActionsDAG::NodeRawConstPtrs outputs;
|
||||
outputs.reserve(output_header.columns() + 1);
|
||||
|
||||
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
|
||||
const auto * grouping_node = &dag->addColumn(
|
||||
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
|
||||
|
||||
grouping_node = &dag->materializeNode(*grouping_node);
|
||||
index.push_back(grouping_node);
|
||||
outputs.push_back(grouping_node);
|
||||
|
||||
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
|
||||
|
||||
@ -264,19 +264,19 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
|
||||
auto column = ColumnConst::create(std::move(column_with_default), 0);
|
||||
const auto * node = &dag->addColumn({ColumnPtr(std::move(column)), col.type, col.name});
|
||||
node = &dag->materializeNode(*node);
|
||||
index.push_back(node);
|
||||
outputs.push_back(node);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)];
|
||||
const auto * column_node = dag->getOutputs()[header.getPositionByName(col.name)];
|
||||
if (group_by_use_nulls && column_node->result_type->canBeInsideNullable())
|
||||
index.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
|
||||
outputs.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
|
||||
else
|
||||
index.push_back(column_node);
|
||||
outputs.push_back(column_node);
|
||||
}
|
||||
}
|
||||
|
||||
dag->getIndex().swap(index);
|
||||
dag->getOutputs().swap(outputs);
|
||||
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
|
||||
auto transform = std::make_shared<ExpressionTransform>(header, expression);
|
||||
|
||||
|
@ -40,17 +40,17 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
|
||||
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
|
||||
{
|
||||
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
|
||||
auto & index = dag->getIndex();
|
||||
auto & outputs = dag->getOutputs();
|
||||
|
||||
if (use_nulls)
|
||||
{
|
||||
auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr);
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
const auto * node = dag->getIndex()[header.getPositionByName(key)];
|
||||
const auto * node = dag->getOutputs()[header.getPositionByName(key)];
|
||||
if (node->result_type->canBeInsideNullable())
|
||||
{
|
||||
dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name));
|
||||
dag->addOrReplaceInOutputs(dag->addFunction(to_nullable, { node }, node->result_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -60,7 +60,7 @@ ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, b
|
||||
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
|
||||
|
||||
grouping_node = &dag->materializeNode(*grouping_node);
|
||||
index.insert(index.begin(), grouping_node);
|
||||
outputs.insert(outputs.begin(), grouping_node);
|
||||
|
||||
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
|
||||
return std::make_shared<ExpressionTransform>(header, expression);
|
||||
|
@ -84,7 +84,7 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
|
||||
const auto & expression = filter->getExpression();
|
||||
const auto & filter_column_name = filter->getFilterColumnName();
|
||||
|
||||
const auto * filter_node = expression->tryFindInIndex(filter_column_name);
|
||||
const auto * filter_node = expression->tryFindInOutputs(filter_column_name);
|
||||
if (!filter_node && !filter->removesFilterColumn())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Filter column {} was removed from ActionsDAG but it is needed in result. DAG:\n{}",
|
||||
@ -102,7 +102,8 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
|
||||
/// Expression/Filter -> Aggregating -> Filter -> Something
|
||||
|
||||
/// New filter column is the first one.
|
||||
String split_filter_column_name = split_filter->getIndex().front()->result_name;
|
||||
String split_filter_column_name = split_filter->getOutputs().front()->result_name;
|
||||
|
||||
node.step = std::make_unique<FilterStep>(
|
||||
node.children.at(0)->step->getOutputStream(), split_filter, std::move(split_filter_column_name), can_remove_filter);
|
||||
|
||||
@ -284,7 +285,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
|
||||
*
|
||||
* New filter column is the first one.
|
||||
*/
|
||||
const String & split_filter_column_name = split_filter->getIndex().front()->result_name;
|
||||
const String & split_filter_column_name = split_filter->getOutputs().front()->result_name;
|
||||
bool can_remove_filter = source_columns.end() == std::find(source_columns.begin(), source_columns.end(), split_filter_column_name);
|
||||
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
|
||||
if (updated_steps > 0)
|
||||
|
@ -895,13 +895,13 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
ActionDAGNodes nodes;
|
||||
if (prewhere_info)
|
||||
{
|
||||
const auto & node = prewhere_info->prewhere_actions->findInIndex(prewhere_info->prewhere_column_name);
|
||||
const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
|
||||
nodes.nodes.push_back(&node);
|
||||
}
|
||||
|
||||
if (added_filter)
|
||||
{
|
||||
const auto & node = added_filter->findInIndex(added_filter_column_name);
|
||||
const auto & node = added_filter->findInOutputs(added_filter_column_name);
|
||||
nodes.nodes.push_back(&node);
|
||||
}
|
||||
|
||||
|
@ -748,7 +748,7 @@ static const ActionsDAG::Node & cloneASTWithInversionPushDown(
|
||||
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction()))
|
||||
{
|
||||
const auto & index_hint_dag = index_hint->getActions();
|
||||
children = index_hint_dag->getIndex();
|
||||
children = index_hint_dag->getOutputs();
|
||||
|
||||
for (auto & arg : children)
|
||||
arg = &cloneASTWithInversionPushDown(*arg, inverted_dag, to_inverted, context, need_inversion);
|
||||
@ -824,7 +824,7 @@ static ActionsDAGPtr cloneASTWithInversionPushDown(ActionsDAG::NodeRawConstPtrs
|
||||
nodes = {&res->addFunction(function_builder, std::move(nodes), "")};
|
||||
}
|
||||
|
||||
res->getIndex().swap(nodes);
|
||||
res->getOutputs().swap(nodes);
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -948,7 +948,7 @@ KeyCondition::KeyCondition(
|
||||
// std::cerr << "========== inverted dag: " << inverted_dag->dumpDAG() << std::endl;
|
||||
|
||||
Block empty;
|
||||
for (const auto * node : inverted_dag->getIndex())
|
||||
for (const auto * node : inverted_dag->getOutputs())
|
||||
traverseAST(Tree(node), context, empty);
|
||||
}
|
||||
else
|
||||
|
Loading…
Reference in New Issue
Block a user