ClickHouse/src/Interpreters/ActionsDAG.cpp

2551 lines
82 KiB
C++
Raw Normal View History

2020-11-16 14:57:56 +00:00
#include <Interpreters/ActionsDAG.h>
2023-02-01 13:33:32 +00:00
#include <Analyzer/FunctionNode.h>
2020-11-16 14:57:56 +00:00
#include <DataTypes/DataTypeArray.h>
2020-11-17 14:51:05 +00:00
#include <DataTypes/DataTypeString.h>
2020-11-16 14:57:56 +00:00
#include <Functions/IFunction.h>
2020-11-17 14:51:05 +00:00
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsConversion.h>
2020-11-17 19:43:26 +00:00
#include <Functions/materialize.h>
2021-02-10 16:26:49 +00:00
#include <Functions/FunctionsLogical.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/indexHint.h>
2020-11-16 14:57:56 +00:00
#include <Interpreters/Context.h>
2022-11-15 01:42:46 +00:00
#include <Interpreters/ArrayJoinAction.h>
2020-11-16 14:57:56 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Core/SortDescription.h>
2023-01-10 11:52:29 +00:00
#include <Planner/PlannerActionsVisitor.h>
2020-11-16 14:57:56 +00:00
#include <stack>
2022-01-30 19:49:48 +00:00
#include <base/sort.h>
#include <Common/JSONBuilder.h>
2020-11-16 14:57:56 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_IDENTIFIER;
extern const int TYPE_MISMATCH;
2020-11-17 14:51:05 +00:00
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int THERE_IS_NO_COLUMN;
2020-11-17 16:24:25 +00:00
extern const int ILLEGAL_COLUMN;
2021-05-13 13:38:18 +00:00
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int BAD_ARGUMENTS;
2020-11-16 14:57:56 +00:00
}
2023-01-03 16:50:06 +00:00
namespace
{
std::pair<ColumnsWithTypeAndName, bool> getFunctionArguments(const ActionsDAG::NodeRawConstPtrs & children)
{
size_t num_arguments = children.size();
bool all_const = true;
ColumnsWithTypeAndName arguments(num_arguments);
for (size_t i = 0; i < num_arguments; ++i)
{
const auto & child = *children[i];
ColumnWithTypeAndName argument;
argument.column = child.column;
argument.type = child.result_type;
argument.name = child.result_name;
if (!argument.column || !isColumnConst(*argument.column))
all_const = false;
arguments[i] = std::move(argument);
}
return { std::move(arguments), all_const };
}
}
void ActionsDAG::Node::toTree(JSONBuilder::JSONMap & map) const
2021-04-09 16:18:45 +00:00
{
map.add("Node Type", magic_enum::enum_name(type));
2021-04-09 16:18:45 +00:00
if (result_type)
map.add("Result Type", result_type->getName());
2021-04-09 16:18:45 +00:00
if (!result_name.empty())
2022-05-27 07:52:04 +00:00
map.add("Result Name", result_name);
2021-04-09 16:18:45 +00:00
if (column)
map.add("Column", column->getName());
2021-04-09 16:18:45 +00:00
if (function_base)
map.add("Function", function_base->getName());
2021-04-09 16:18:45 +00:00
if (type == ActionType::FUNCTION)
map.add("Compiled", is_function_compiled);
2021-04-09 16:18:45 +00:00
}
2020-11-16 14:57:56 +00:00
2020-11-17 12:39:41 +00:00
ActionsDAG::ActionsDAG(const NamesAndTypesList & inputs_)
2020-11-16 14:57:56 +00:00
{
2020-11-17 12:39:41 +00:00
for (const auto & input : inputs_)
2022-08-08 15:54:51 +00:00
outputs.push_back(&addInput(input.name, input.type));
2020-11-16 14:57:56 +00:00
}
2020-11-17 12:39:41 +00:00
ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
2020-11-16 14:57:56 +00:00
{
2020-11-17 12:39:41 +00:00
for (const auto & input : inputs_)
2020-11-16 14:57:56 +00:00
{
if (input.column && isColumnConst(*input.column))
{
2021-03-02 17:08:59 +00:00
addInput(input);
/// Here we also add column.
/// It will allow to remove input which is actually constant (after projection).
/// Also, some transforms from query pipeline may randomly materialize constants,
/// without any respect to header structure. So, it is a way to drop materialized column and use
/// constant value from header.
/// We cannot remove such input right now cause inputs positions are important in some cases.
2022-08-08 15:54:51 +00:00
outputs.push_back(&addColumn(input));
}
2020-11-16 14:57:56 +00:00
else
2022-08-08 15:54:51 +00:00
outputs.push_back(&addInput(input.name, input.type));
2020-11-16 14:57:56 +00:00
}
}
2021-03-02 17:08:59 +00:00
ActionsDAG::Node & ActionsDAG::addNode(Node node)
2020-11-16 14:57:56 +00:00
{
auto & res = nodes.emplace_back(std::move(node));
2020-11-17 12:34:31 +00:00
if (res.type == ActionType::INPUT)
inputs.emplace_back(&res);
2020-11-16 14:57:56 +00:00
return res;
}
2021-03-02 17:08:59 +00:00
const ActionsDAG::Node & ActionsDAG::addInput(std::string name, DataTypePtr type)
2020-11-16 14:57:56 +00:00
{
Node node;
node.type = ActionType::INPUT;
node.result_type = std::move(type);
node.result_name = std::move(name);
2021-03-02 17:08:59 +00:00
return addNode(std::move(node));
2020-11-16 14:57:56 +00:00
}
2021-03-02 17:08:59 +00:00
const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column)
2020-11-16 14:57:56 +00:00
{
Node node;
node.type = ActionType::INPUT;
node.result_type = std::move(column.type);
node.result_name = std::move(column.name);
node.column = std::move(column.column);
2021-03-02 17:08:59 +00:00
return addNode(std::move(node));
2020-11-16 14:57:56 +00:00
}
2021-03-02 17:08:59 +00:00
const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column)
2020-11-16 14:57:56 +00:00
{
if (!column.column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add column {} because it is nullptr", column.name);
Node node;
node.type = ActionType::COLUMN;
node.result_type = std::move(column.type);
node.result_name = std::move(column.name);
node.column = std::move(column.column);
2021-03-11 17:03:39 +00:00
return addNode(std::move(node));
2020-11-16 14:57:56 +00:00
}
2021-03-02 17:08:59 +00:00
const ActionsDAG::Node & ActionsDAG::addAlias(const Node & child, std::string alias)
2020-11-18 09:08:51 +00:00
{
2020-11-16 14:57:56 +00:00
Node node;
node.type = ActionType::ALIAS;
node.result_type = child.result_type;
node.result_name = std::move(alias);
node.column = child.column;
node.children.emplace_back(&child);
2021-03-02 17:08:59 +00:00
return addNode(std::move(node));
2020-11-16 14:57:56 +00:00
}
2021-03-02 17:08:59 +00:00
const ActionsDAG::Node & ActionsDAG::addArrayJoin(const Node & child, std::string result_name)
2020-11-16 14:57:56 +00:00
{
const auto & array_type = getArrayJoinDataType(child.result_type);
2020-11-16 14:57:56 +00:00
if (!array_type)
throw Exception(ErrorCodes::TYPE_MISMATCH, "ARRAY JOIN requires array argument");
2020-11-16 14:57:56 +00:00
2021-06-22 10:28:56 +00:00
if (result_name.empty())
result_name = "arrayJoin(" + child.result_name + ")";
2020-11-16 14:57:56 +00:00
Node node;
node.type = ActionType::ARRAY_JOIN;
node.result_type = array_type->getNestedType();
node.result_name = std::move(result_name);
node.children.emplace_back(&child);
return addNode(std::move(node));
}
const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionOverloadResolverPtr & function,
NodeRawConstPtrs children,
std::string result_name)
2020-11-17 12:34:31 +00:00
{
2022-12-29 15:49:51 +00:00
auto [arguments, all_const] = getFunctionArguments(children);
2023-01-04 16:00:47 +00:00
auto function_base = function->build(arguments);
2022-12-29 15:49:51 +00:00
return addFunctionImpl(
2023-01-04 16:00:47 +00:00
function_base,
2022-12-29 15:49:51 +00:00
std::move(children),
std::move(arguments),
std::move(result_name),
2023-02-01 13:33:32 +00:00
function_base->getResultType(),
all_const);
}
const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionNode & function,
NodeRawConstPtrs children,
std::string result_name)
{
auto [arguments, all_const] = getFunctionArguments(children);
return addFunctionImpl(
function.getFunction(),
std::move(children),
std::move(arguments),
std::move(result_name),
function.getResultType(),
2022-12-29 15:49:51 +00:00
all_const);
2020-11-16 14:57:56 +00:00
}
2022-11-28 15:02:59 +00:00
const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionBasePtr & function_base,
NodeRawConstPtrs children,
std::string result_name)
2022-12-29 15:49:51 +00:00
{
auto [arguments, all_const] = getFunctionArguments(children);
return addFunctionImpl(
function_base,
std::move(children),
std::move(arguments),
std::move(result_name),
2023-02-01 13:33:32 +00:00
function_base->getResultType(),
2022-12-29 15:49:51 +00:00
all_const);
}
const ActionsDAG::Node & ActionsDAG::addCast(const Node & node_to_cast, const DataTypePtr & cast_type, std::string result_name)
2023-01-10 11:52:29 +00:00
{
Field cast_type_constant_value(cast_type->getName());
ColumnWithTypeAndName column;
column.name = calculateConstantActionNodeName(cast_type_constant_value);
column.column = DataTypeString().createColumnConst(0, cast_type_constant_value);
column.type = std::make_shared<DataTypeString>();
const auto * cast_type_constant_node = &addColumn(std::move(column));
ActionsDAG::NodeRawConstPtrs children = {&node_to_cast, cast_type_constant_node};
FunctionOverloadResolverPtr func_builder_cast = CastInternalOverloadResolver<CastType::nonAccurate>::createImpl();
return addFunction(func_builder_cast, std::move(children), result_name);
2023-01-10 11:52:29 +00:00
}
2022-12-29 15:49:51 +00:00
const ActionsDAG::Node & ActionsDAG::addFunctionImpl(
const FunctionBasePtr & function_base,
NodeRawConstPtrs children,
ColumnsWithTypeAndName arguments,
std::string result_name,
2023-02-01 13:33:32 +00:00
DataTypePtr result_type,
2022-12-29 15:49:51 +00:00
bool all_const)
2022-11-28 15:02:59 +00:00
{
size_t num_arguments = children.size();
Node node;
node.type = ActionType::FUNCTION;
node.children = std::move(children);
node.function_base = function_base;
2023-02-01 13:33:32 +00:00
node.result_type = result_type;
2022-11-28 15:02:59 +00:00
node.function = node.function_base->prepare(arguments);
node.is_deterministic = node.function_base->isDeterministic();
/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (node.function_base->isSuitableForConstantFolding())
{
ColumnPtr column;
if (all_const)
{
size_t num_rows = arguments.empty() ? 0 : arguments.front().column->size();
column = node.function->execute(arguments, node.result_type, num_rows, true);
}
else
{
column = node.function_base->getConstantResultForNonConstArguments(arguments, node.result_type);
}
/// If the result is not a constant, just in case, we will consider the result as unknown.
if (column && isColumnConst(*column))
{
/// All constant (literal) columns in block are added with size 1.
/// But if there was no columns in block before executing a function, the result has size 0.
/// Change the size to 1.
if (column->empty())
column = column->cloneResized(1);
node.column = std::move(column);
}
}
if (result_name.empty())
{
result_name = function_base->getName() + "(";
for (size_t i = 0; i < num_arguments; ++i)
{
if (i)
result_name += ", ";
result_name += node.children[i]->result_name;
}
result_name += ")";
}
node.result_name = std::move(result_name);
return addNode(std::move(node));
}
2022-08-08 15:54:51 +00:00
const ActionsDAG::Node & ActionsDAG::findInOutputs(const std::string & name) const
2021-03-11 17:03:39 +00:00
{
2022-08-08 15:54:51 +00:00
if (const auto * node = tryFindInOutputs(name))
2021-03-11 17:03:39 +00:00
return *node;
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown identifier: '{}'", name);
}
2022-08-08 15:54:51 +00:00
const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name) const
2021-03-11 17:03:39 +00:00
{
2022-08-08 15:54:51 +00:00
for (const auto & node : outputs)
2021-03-11 17:03:39 +00:00
if (node->result_name == name)
return node;
return nullptr;
}
2022-08-08 15:54:51 +00:00
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
2021-03-11 17:03:39 +00:00
{
2022-08-08 15:54:51 +00:00
for (auto & output_node : outputs)
2021-03-11 17:03:39 +00:00
{
2022-08-08 15:54:51 +00:00
if (output_node->result_name == node.result_name)
2021-03-11 17:03:39 +00:00
{
2022-08-08 15:54:51 +00:00
output_node = &node;
2021-03-11 17:03:39 +00:00
return;
}
}
2022-08-08 15:54:51 +00:00
outputs.push_back(&node);
2021-03-11 17:03:39 +00:00
}
2020-11-17 07:03:11 +00:00
2020-11-16 14:57:56 +00:00
NamesAndTypesList ActionsDAG::getRequiredColumns() const
{
NamesAndTypesList result;
2022-08-08 15:54:51 +00:00
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name, input_node->result_type);
2020-11-16 14:57:56 +00:00
return result;
}
Names ActionsDAG::getRequiredColumnsNames() const
{
Names result;
result.reserve(inputs.size());
2022-08-08 15:54:51 +00:00
for (const auto & input_node : inputs)
result.emplace_back(input_node->result_name);
return result;
}
2020-11-16 14:57:56 +00:00
ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
{
ColumnsWithTypeAndName result;
2022-08-08 15:54:51 +00:00
result.reserve(outputs.size());
for (const auto & node : outputs)
2020-11-16 14:57:56 +00:00
result.emplace_back(node->column, node->result_type, node->result_name);
return result;
}
NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
{
NamesAndTypesList result;
2022-08-08 15:54:51 +00:00
for (const auto & node : outputs)
2020-11-16 14:57:56 +00:00
result.emplace_back(node->result_name, node->result_type);
return result;
}
Names ActionsDAG::getNames() const
{
Names names;
2022-08-08 15:54:51 +00:00
names.reserve(outputs.size());
for (const auto & node : outputs)
2020-11-16 14:57:56 +00:00
names.emplace_back(node->result_name);
return names;
}
std::string ActionsDAG::dumpNames() const
{
WriteBufferFromOwnString out;
for (auto it = nodes.begin(); it != nodes.end(); ++it)
{
if (it != nodes.begin())
out << ", ";
out << it->result_name;
}
return out.str();
}
void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_remove_inputs, bool allow_constant_folding)
2021-03-09 16:54:06 +00:00
{
NodeRawConstPtrs required_nodes;
required_nodes.reserve(required_names.size());
NameSet added;
2022-08-08 15:54:51 +00:00
for (const auto & node : outputs)
2021-03-09 16:54:06 +00:00
{
if (required_names.contains(node->result_name) && !added.contains(node->result_name))
2021-03-09 16:54:06 +00:00
{
required_nodes.push_back(node);
added.insert(node->result_name);
}
}
if (added.size() < required_names.size())
{
for (const auto & name : required_names)
if (!added.contains(name))
2021-03-09 16:54:06 +00:00
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", name, dumpNames());
}
2022-08-08 15:54:51 +00:00
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
2021-03-09 16:54:06 +00:00
}
void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_remove_inputs, bool allow_constant_folding)
2020-11-16 14:57:56 +00:00
{
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs required_nodes;
2020-11-16 14:57:56 +00:00
required_nodes.reserve(required_names.size());
2021-03-10 18:50:34 +00:00
std::unordered_map<std::string_view, const Node *> names_map;
2022-08-08 15:54:51 +00:00
for (const auto * node : outputs)
2021-03-10 18:50:34 +00:00
names_map[node->result_name] = node;
2021-03-02 17:08:59 +00:00
2020-11-16 14:57:56 +00:00
for (const auto & name : required_names)
{
2021-03-10 18:50:34 +00:00
auto it = names_map.find(name);
if (it == names_map.end())
2020-11-16 14:57:56 +00:00
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
2021-03-10 18:50:34 +00:00
"Unknown column: {}, there are only columns {}", name, dumpDAG());
2020-11-16 14:57:56 +00:00
2021-03-10 18:50:34 +00:00
required_nodes.push_back(it->second);
2020-11-16 14:57:56 +00:00
}
2022-08-08 15:54:51 +00:00
outputs.swap(required_nodes);
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
2020-11-16 14:57:56 +00:00
}
void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_constant_folding)
2020-11-16 14:57:56 +00:00
{
std::unordered_set<const Node *> visited_nodes;
std::stack<Node *> stack;
2022-08-08 15:54:51 +00:00
for (const auto * node : outputs)
2020-11-16 14:57:56 +00:00
{
visited_nodes.insert(node);
2021-03-02 17:08:59 +00:00
stack.push(const_cast<Node *>(node));
2020-11-16 14:57:56 +00:00
}
2020-11-27 09:11:34 +00:00
for (auto & node : nodes)
{
2021-02-03 10:29:44 +00:00
/// We cannot remove arrayJoin because it changes the number of rows.
bool is_array_join = node.type == ActionType::ARRAY_JOIN;
if (is_array_join && !visited_nodes.contains(&node))
2020-11-27 09:11:34 +00:00
{
visited_nodes.insert(&node);
stack.push(&node);
}
2021-02-10 16:26:49 +00:00
if (node.type == ActionType::INPUT && !allow_remove_inputs)
visited_nodes.insert(&node);
2020-11-27 09:11:34 +00:00
}
2020-11-16 14:57:56 +00:00
while (!stack.empty())
{
auto * node = stack.top();
stack.pop();
/// Constant folding.
if (allow_constant_folding && !node->children.empty() && node->column && isColumnConst(*node->column))
2020-11-16 14:57:56 +00:00
{
node->type = ActionsDAG::ActionType::COLUMN;
2021-07-12 06:36:46 +00:00
for (const auto & child : node->children)
{
if (!child->is_deterministic)
{
node->is_deterministic = false;
break;
}
}
2020-11-16 14:57:56 +00:00
node->children.clear();
}
2021-03-02 17:51:54 +00:00
for (const auto * child : node->children)
2020-11-16 14:57:56 +00:00
{
if (!visited_nodes.contains(child))
2020-11-16 14:57:56 +00:00
{
2021-03-02 17:51:54 +00:00
stack.push(const_cast<Node *>(child));
2020-11-16 14:57:56 +00:00
visited_nodes.insert(child);
}
}
}
nodes.remove_if([&](const Node & node) { return !visited_nodes.contains(&node); });
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
2020-11-16 14:57:56 +00:00
}
2021-05-13 13:38:18 +00:00
static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments)
{
ColumnWithTypeAndName res_column;
res_column.type = node->result_type;
res_column.name = node->result_name;
switch (node->type)
{
case ActionsDAG::ActionType::FUNCTION:
{
res_column.column = node->function->execute(arguments, res_column.type, 0, true);
break;
}
case ActionsDAG::ActionType::ARRAY_JOIN:
{
auto key = arguments.at(0);
key.column = key.column->convertToFullColumnIfConst();
const auto * array = getArrayJoinColumnRawPtr(key.column);
2021-05-13 13:38:18 +00:00
if (!array)
throw Exception(ErrorCodes::TYPE_MISMATCH,
2022-11-15 08:02:49 +00:00
"ARRAY JOIN of not array nor map: {}", node->result_name);
2021-05-13 13:38:18 +00:00
res_column.column = array->getDataPtr()->cloneEmpty();
break;
}
case ActionsDAG::ActionType::COLUMN:
{
res_column.column = node->column->cloneResized(0);
break;
}
case ActionsDAG::ActionType::ALIAS:
{
res_column.column = arguments.at(0).column;
break;
}
case ActionsDAG::ActionType::INPUT:
{
break;
}
}
return res_column;
}
Block ActionsDAG::updateHeader(Block header) const
{
2021-05-14 09:30:32 +00:00
std::unordered_map<const Node *, ColumnWithTypeAndName> node_to_column;
2021-05-14 10:01:27 +00:00
std::set<size_t> pos_to_remove;
2021-05-13 13:38:18 +00:00
{
std::unordered_map<std::string_view, std::list<size_t>> input_positions;
for (size_t pos = 0; pos < inputs.size(); ++pos)
input_positions[inputs[pos]->result_name].emplace_back(pos);
for (size_t pos = 0; pos < header.columns(); ++pos)
{
const auto & col = header.getByPosition(pos);
auto it = input_positions.find(col.name);
if (it != input_positions.end() && !it->second.empty())
{
auto & list = it->second;
2021-05-14 10:01:27 +00:00
pos_to_remove.insert(pos);
node_to_column[inputs[list.front()]] = col;
2021-05-13 13:38:18 +00:00
list.pop_front();
}
}
}
ColumnsWithTypeAndName result_columns;
2022-08-08 15:54:51 +00:00
result_columns.reserve(outputs.size());
2021-05-13 13:38:18 +00:00
2021-05-14 09:30:32 +00:00
struct Frame
{
2021-05-23 23:56:03 +00:00
const Node * node = nullptr;
2021-05-14 09:30:32 +00:00
size_t next_child = 0;
};
2021-05-13 13:38:18 +00:00
{
2022-08-08 15:54:51 +00:00
for (const auto * output_node : outputs)
2021-05-13 13:38:18 +00:00
{
2022-08-08 15:54:51 +00:00
if (!node_to_column.contains(output_node))
2021-05-13 13:38:18 +00:00
{
2021-05-14 09:30:32 +00:00
std::stack<Frame> stack;
2022-08-08 15:54:51 +00:00
stack.push({.node = output_node});
2021-05-13 13:38:18 +00:00
while (!stack.empty())
{
2021-05-14 09:30:32 +00:00
auto & frame = stack.top();
const auto * node = frame.node;
2021-05-13 13:38:18 +00:00
2021-05-14 09:30:32 +00:00
while (frame.next_child < node->children.size())
2021-05-13 13:38:18 +00:00
{
2021-05-14 09:30:32 +00:00
const auto * child = node->children[frame.next_child];
if (!node_to_column.contains(child))
2021-05-13 13:38:18 +00:00
{
2021-05-14 09:30:32 +00:00
stack.push({.node = child});
2021-05-13 13:38:18 +00:00
break;
}
2021-05-14 09:30:32 +00:00
++frame.next_child;
2021-05-13 13:38:18 +00:00
}
2021-05-14 09:30:32 +00:00
if (frame.next_child < node->children.size())
2021-05-13 13:38:18 +00:00
continue;
stack.pop();
ColumnsWithTypeAndName arguments(node->children.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
2021-05-14 09:30:32 +00:00
arguments[i] = node_to_column[node->children[i]];
2021-05-13 18:07:47 +00:00
if (!arguments[i].column)
2021-05-13 13:38:18 +00:00
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Not found column {} in block {}", node->children[i]->result_name,
header.dumpStructure());
2021-05-13 13:38:18 +00:00
}
2021-05-13 18:07:47 +00:00
if (node->type == ActionsDAG::ActionType::INPUT)
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Not found column {} in block {}",
node->result_name, header.dumpStructure());
2021-05-14 09:30:32 +00:00
node_to_column[node] = executeActionForHeader(node, std::move(arguments));
2021-05-13 13:38:18 +00:00
}
}
2022-08-08 15:54:51 +00:00
if (node_to_column[output_node].column)
result_columns.push_back(node_to_column[output_node]);
2021-05-13 13:38:18 +00:00
}
}
if (isInputProjected())
header.clear();
else
2021-05-14 10:01:27 +00:00
header.erase(pos_to_remove);
2021-05-13 13:38:18 +00:00
Block res;
for (auto & col : result_columns)
res.insert(std::move(col));
for (auto && item : header)
2021-05-13 13:38:18 +00:00
res.insert(std::move(item));
return res;
}
NameSet ActionsDAG::foldActionsByProjection(
2021-05-04 12:40:34 +00:00
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys)
{
std::unordered_set<const Node *> visited_nodes;
2022-08-08 15:54:51 +00:00
std::unordered_set<std::string_view> visited_output_nodes_names;
std::stack<Node *> stack;
2022-08-08 15:54:51 +00:00
/// Record all needed output nodes to start folding.
for (const auto & output_node : outputs)
{
2022-08-08 15:54:51 +00:00
if (required_columns.find(output_node->result_name) != required_columns.end() || output_node->result_name == predicate_column_name)
{
2022-08-08 15:54:51 +00:00
visited_nodes.insert(output_node);
visited_output_nodes_names.insert(output_node->result_name);
stack.push(const_cast<Node *>(output_node));
}
}
2022-08-08 15:54:51 +00:00
/// If some required columns are not in any output node, try searching from all projection key
2022-02-06 08:45:49 +00:00
/// columns. If still missing, return empty set which means current projection fails to match
/// (missing columns).
2021-05-04 12:40:34 +00:00
if (add_missing_keys)
{
2021-05-04 12:40:34 +00:00
for (const auto & column : required_columns)
{
2022-08-08 15:54:51 +00:00
if (visited_output_nodes_names.find(column) == visited_output_nodes_names.end())
{
2021-05-04 12:40:34 +00:00
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
{
const auto * node = &addInput(*column_with_type_name);
visited_nodes.insert(node);
2022-08-08 15:54:51 +00:00
outputs.push_back(node);
visited_output_nodes_names.insert(column);
2021-05-04 12:40:34 +00:00
}
else
{
// Missing column
return {};
}
}
}
}
2022-02-06 08:45:49 +00:00
/// Traverse the DAG from root to leaf. Substitute any matched node with columns in projection_block_for_keys.
while (!stack.empty())
{
auto * node = stack.top();
stack.pop();
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(node->result_name))
{
if (node->type != ActionsDAG::ActionType::INPUT)
{
/// Projection folding.
node->type = ActionsDAG::ActionType::INPUT;
node->result_type = column_with_type_name->type;
node->result_name = column_with_type_name->name;
node->children.clear();
inputs.push_back(node);
}
}
for (const auto * child : node->children)
{
if (!visited_nodes.contains(child))
{
stack.push(const_cast<Node *>(child));
visited_nodes.insert(child);
}
}
}
2022-02-06 08:45:49 +00:00
/// Clean up unused nodes after folding.
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
2022-08-08 15:54:51 +00:00
std::erase_if(outputs, [&](const Node * node) { return !visited_output_nodes_names.contains(node->result_name); });
nodes.remove_if([&](const Node & node) { return !visited_nodes.contains(&node); });
2022-02-06 08:45:49 +00:00
/// Calculate the required columns after folding.
NameSet next_required_columns;
2022-08-08 15:54:51 +00:00
for (const auto & input_node : inputs)
next_required_columns.insert(input_node->result_name);
return next_required_columns;
}
ActionsDAGPtr ActionsDAG::foldActionsByProjection(const std::unordered_map<const Node *, const Node *> & new_inputs, const NodeRawConstPtrs & required_outputs)
{
auto dag = std::make_unique<ActionsDAG>();
std::unordered_map<const Node *, const Node *> inputs_mapping;
std::unordered_map<const Node *, const Node *> mapping;
struct Frame
{
const Node * node;
size_t next_child = 0;
};
std::vector<Frame> stack;
2023-02-06 18:42:58 +00:00
for (const auto * output : required_outputs)
{
if (mapping.contains(output))
continue;
stack.push_back({.node = output});
while (!stack.empty())
{
auto & frame = stack.back();
if (frame.next_child == 0)
{
auto it = new_inputs.find(frame.node);
if (it != new_inputs.end())
{
const auto & [new_input, rename] = *it;
2023-02-06 18:42:58 +00:00
auto & node = mapping[frame.node];
if (!node)
{
2023-04-03 18:18:39 +00:00
/// It is possible to have a few aliases on the same column.
/// We may want to replace all the aliases,
/// in this case they should have a single input as a child.
auto & mapped_input = inputs_mapping[rename];
if (!mapped_input)
{
bool should_rename = new_input->result_name != rename->result_name;
const auto & input_name = should_rename ? rename->result_name : new_input->result_name;
mapped_input = &dag->addInput(input_name, new_input->result_type);
if (should_rename)
mapped_input = &dag->addAlias(*mapped_input, new_input->result_name);
}
node = mapped_input;
2023-02-06 18:42:58 +00:00
}
stack.pop_back();
continue;
}
}
const auto & children = frame.node->children;
while (frame.next_child < children.size() && !mapping.emplace(children[frame.next_child], nullptr).second)
++frame.next_child;
if (frame.next_child < children.size())
{
const auto * child = children[frame.next_child];
++frame.next_child;
stack.push_back({.node = child});
continue;
}
if (frame.node->type == ActionType::INPUT)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot fold actions for projection. Node {} requires input {} which does not belong to projection",
stack.front().node->result_name, frame.node->result_name);
auto & node = dag->nodes.emplace_back(*frame.node);
for (auto & child : node.children)
child = mapping[child];
2023-02-06 18:42:58 +00:00
mapping[frame.node] = &node;
stack.pop_back();
}
}
2023-02-06 18:42:58 +00:00
for (const auto * output : required_outputs)
{
2023-04-03 18:18:39 +00:00
/// Keep the names for outputs.
/// Add an alias if the mapped node has a different result name.
const auto * mapped_output = mapping[output];
if (output->result_name != mapped_output->result_name)
mapped_output = &dag->addAlias(*mapped_output, output->result_name);
dag->outputs.push_back(mapped_output);
}
return dag;
}
2021-04-29 15:31:08 +00:00
void ActionsDAG::reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map)
{
2022-08-08 15:54:51 +00:00
::sort(outputs.begin(), outputs.end(), [&key_names_pos_map](const Node * lhs, const Node * rhs)
2021-04-29 15:31:08 +00:00
{
return key_names_pos_map.find(lhs->result_name)->second < key_names_pos_map.find(rhs->result_name)->second;
});
}
void ActionsDAG::addAggregatesViaProjection(const Block & aggregates)
{
for (const auto & aggregate : aggregates)
2022-08-08 15:54:51 +00:00
outputs.push_back(&addInput(aggregate));
}
2021-03-11 14:13:36 +00:00
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
2020-11-16 14:57:56 +00:00
{
2021-03-11 14:13:36 +00:00
std::unordered_map<std::string_view, size_t> names_map;
2022-08-08 15:54:51 +00:00
size_t output_nodes_size = outputs.size();
for (size_t i = 0; i < output_nodes_size; ++i)
names_map[outputs[i]->result_name] = i;
size_t aliases_size = aliases.size();
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs required_nodes;
2022-08-08 15:54:51 +00:00
required_nodes.reserve(aliases_size);
2020-11-16 14:57:56 +00:00
for (const auto & item : aliases)
{
2021-03-04 17:38:12 +00:00
auto it = names_map.find(item.first);
if (it == names_map.end())
2021-03-02 17:08:59 +00:00
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
2021-03-02 17:51:54 +00:00
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
2021-03-02 17:08:59 +00:00
2022-08-08 15:54:51 +00:00
required_nodes.push_back(outputs[it->second]);
2020-11-16 14:57:56 +00:00
}
2022-08-08 15:54:51 +00:00
for (size_t i = 0; i < aliases_size; ++i)
2020-11-16 14:57:56 +00:00
{
const auto & item = aliases[i];
2021-03-02 17:08:59 +00:00
const auto * child = required_nodes[i];
2020-11-16 14:57:56 +00:00
if (!item.second.empty() && item.first != item.second)
{
Node node;
node.type = ActionType::ALIAS;
node.result_type = child->result_type;
node.result_name = item.second;
2020-11-16 14:57:56 +00:00
node.column = child->column;
node.children.emplace_back(child);
2021-03-11 14:13:36 +00:00
child = &addNode(std::move(node));
2020-11-16 14:57:56 +00:00
}
2021-03-11 14:13:36 +00:00
auto it = names_map.find(child->result_name);
if (it == names_map.end())
{
2022-08-08 15:54:51 +00:00
names_map[child->result_name] = outputs.size();
outputs.push_back(child);
2021-03-11 14:13:36 +00:00
}
else
2022-08-08 15:54:51 +00:00
outputs[it->second] = child;
2020-11-16 14:57:56 +00:00
}
}
void ActionsDAG::project(const NamesWithAliases & projection)
{
2021-03-11 14:13:36 +00:00
std::unordered_map<std::string_view, const Node *> names_map;
2022-08-08 15:54:51 +00:00
for (const auto * output_node : outputs)
names_map.emplace(output_node->result_name, output_node);
outputs.clear();
2021-03-11 14:13:36 +00:00
2022-08-08 15:54:51 +00:00
size_t projection_size = projection.size();
outputs.reserve(projection_size);
2021-03-11 14:13:36 +00:00
for (const auto & item : projection)
{
auto it = names_map.find(item.first);
if (it == names_map.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", item.first, dumpNames());
2022-08-08 15:54:51 +00:00
outputs.push_back(it->second);
2021-03-11 14:13:36 +00:00
}
2022-08-08 15:54:51 +00:00
for (size_t i = 0; i < projection_size; ++i)
2021-03-11 14:13:36 +00:00
{
const auto & item = projection[i];
2022-08-08 15:54:51 +00:00
auto & child = outputs[i];
2021-03-11 14:13:36 +00:00
if (!item.second.empty() && item.first != item.second)
{
Node node;
node.type = ActionType::ALIAS;
node.result_type = child->result_type;
node.result_name = item.second;
2021-03-11 14:13:36 +00:00
node.column = child->column;
node.children.emplace_back(child);
child = &addNode(std::move(node));
}
}
2021-03-02 17:08:59 +00:00
removeUnusedActions();
2020-11-16 14:57:56 +00:00
projectInput();
2021-03-02 17:08:59 +00:00
projected_output = true;
2020-11-16 14:57:56 +00:00
}
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
{
2022-08-08 15:54:51 +00:00
for (const auto * output_node : outputs)
if (output_node->result_name == column_name)
2021-03-02 17:08:59 +00:00
return true;
2020-11-16 14:57:56 +00:00
for (auto it = nodes.rbegin(); it != nodes.rend(); ++it)
{
auto & node = *it;
if (node.result_name == column_name)
{
2022-08-08 15:54:51 +00:00
outputs.push_back(&node);
2020-11-16 14:57:56 +00:00
return true;
}
}
return false;
}
bool ActionsDAG::removeUnusedResult(const std::string & column_name)
2021-01-19 10:03:25 +00:00
{
2022-08-08 15:54:51 +00:00
/// Find column in output nodes and remove.
const Node * col;
{
2022-08-08 15:54:51 +00:00
auto it = outputs.begin();
for (; it != outputs.end(); ++it)
if ((*it)->result_name == column_name)
break;
2022-08-08 15:54:51 +00:00
if (it == outputs.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found result {} in ActionsDAG\n{}", column_name, dumpDAG());
col = *it;
2022-08-08 15:54:51 +00:00
outputs.erase(it);
}
/// Check if column is in input.
2021-01-19 10:03:25 +00:00
auto it = inputs.begin();
for (; it != inputs.end(); ++it)
if (*it == col)
2021-01-19 10:03:25 +00:00
break;
if (it == inputs.end())
return false;
2021-01-19 10:03:25 +00:00
/// Check column has no dependent.
2021-01-19 10:03:25 +00:00
for (const auto & node : nodes)
for (const auto * child : node.children)
if (col == child)
return false;
2021-01-19 10:03:25 +00:00
2022-08-08 15:54:51 +00:00
/// Do not remove input if it was mentioned in output nodes several times.
for (const auto * output_node : outputs)
if (col == output_node)
2021-02-12 15:20:54 +00:00
return false;
/// Remove from nodes and inputs.
2021-01-19 11:48:09 +00:00
for (auto jt = nodes.begin(); jt != nodes.end(); ++jt)
{
if (&(*jt) == *it)
2021-01-19 11:48:09 +00:00
{
nodes.erase(jt);
break;
}
}
2021-01-19 10:03:25 +00:00
inputs.erase(it);
return true;
2021-01-19 10:03:25 +00:00
}
2020-11-16 14:57:56 +00:00
ActionsDAGPtr ActionsDAG::clone() const
{
2021-03-02 17:08:59 +00:00
auto actions = std::make_shared<ActionsDAG>();
2021-03-02 17:51:54 +00:00
actions->project_input = project_input;
actions->projected_output = projected_output;
2020-11-16 14:57:56 +00:00
std::unordered_map<const Node *, Node *> copy_map;
for (const auto & node : nodes)
{
auto & copy_node = actions->nodes.emplace_back(node);
copy_map[&node] = &copy_node;
}
for (auto & node : actions->nodes)
for (auto & child : node.children)
child = copy_map[child];
2022-08-08 15:54:51 +00:00
for (const auto & output_node : outputs)
actions->outputs.push_back(copy_map[output_node]);
2020-11-16 14:57:56 +00:00
2022-08-08 15:54:51 +00:00
for (const auto & input_node : inputs)
actions->inputs.push_back(copy_map[input_node]);
2020-11-17 12:34:31 +00:00
2020-11-16 14:57:56 +00:00
return actions;
}
2021-03-05 09:54:17 +00:00
#if USE_EMBEDDED_COMPILER
2021-08-10 11:31:15 +00:00
void ActionsDAG::compileExpressions(size_t min_count_to_compile_expression, const std::unordered_set<const ActionsDAG::Node *> & lazy_executed_nodes)
2020-11-16 14:57:56 +00:00
{
2021-08-10 11:31:15 +00:00
compileFunctions(min_count_to_compile_expression, lazy_executed_nodes);
removeUnusedActions(/*allow_remove_inputs = */ false);
2020-11-16 14:57:56 +00:00
}
2021-03-05 09:54:17 +00:00
#endif
2020-11-16 14:57:56 +00:00
std::string ActionsDAG::dumpDAG() const
{
std::unordered_map<const Node *, size_t> map;
for (const auto & node : nodes)
{
size_t idx = map.size();
map[&node] = idx;
}
WriteBufferFromOwnString out;
for (const auto & node : nodes)
{
out << map[&node] << " : ";
switch (node.type)
{
case ActionsDAG::ActionType::COLUMN:
out << "COLUMN ";
break;
case ActionsDAG::ActionType::ALIAS:
out << "ALIAS ";
break;
case ActionsDAG::ActionType::FUNCTION:
out << "FUNCTION ";
break;
case ActionsDAG::ActionType::ARRAY_JOIN:
out << "ARRAY JOIN ";
break;
case ActionsDAG::ActionType::INPUT:
out << "INPUT ";
break;
}
out << "(";
for (size_t i = 0; i < node.children.size(); ++i)
{
if (i)
out << ", ";
out << map[node.children[i]];
}
out << ")";
out << " " << (node.column ? node.column->getName() : "(no column)");
out << " " << (node.result_type ? node.result_type->getName() : "(no type)");
out << " " << (!node.result_name.empty() ? node.result_name : "(no name)");
2021-05-07 18:36:07 +00:00
2020-11-16 14:57:56 +00:00
if (node.function_base)
out << " [" << node.function_base->getName() << "]";
2021-05-07 18:36:07 +00:00
if (node.is_function_compiled)
out << " [compiled]";
2020-11-16 14:57:56 +00:00
out << "\n";
}
2022-08-08 15:54:51 +00:00
out << "Output nodes:";
for (const auto * node : outputs)
2021-01-12 18:47:54 +00:00
out << ' ' << map[node];
out << '\n';
out << "Project input: " << project_input << '\n';
out << "Projected output: " << projected_output << '\n';
2020-11-16 14:57:56 +00:00
return out.str();
}
bool ActionsDAG::hasArrayJoin() const
{
for (const auto & node : nodes)
if (node.type == ActionType::ARRAY_JOIN)
return true;
return false;
}
bool ActionsDAG::hasStatefulFunctions() const
{
for (const auto & node : nodes)
if (node.type == ActionType::FUNCTION && node.function_base->isStateful())
return true;
return false;
}
2021-01-28 11:00:24 +00:00
bool ActionsDAG::trivial() const
2020-11-16 14:57:56 +00:00
{
for (const auto & node : nodes)
2021-01-28 11:00:24 +00:00
if (node.type == ActionType::FUNCTION || node.type == ActionType::ARRAY_JOIN)
2020-11-16 14:57:56 +00:00
return false;
return true;
}
void ActionsDAG::assertDeterministic() const
{
2021-07-12 06:36:46 +00:00
for (const auto & node : nodes)
if (!node.is_deterministic)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expression must be deterministic but it contains non-deterministic part `{}`", node.result_name);
}
bool ActionsDAG::hasNonDeterministic() const
{
for (const auto & node : nodes)
if (!node.is_deterministic)
return true;
return false;
}
void ActionsDAG::addMaterializingOutputActions()
2021-03-09 16:54:06 +00:00
{
2022-08-08 15:54:51 +00:00
for (auto & output_node : outputs)
output_node = &materializeNode(*output_node);
2021-03-09 16:54:06 +00:00
}
const ActionsDAG::Node & ActionsDAG::materializeNode(const Node & node)
{
2022-05-24 04:09:00 +00:00
FunctionOverloadResolverPtr func_builder_materialize
= std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
2021-03-09 16:54:06 +00:00
const auto & name = node.result_name;
const auto * func = &addFunction(func_builder_materialize, {&node}, {});
return addAlias(*func, name);
}
2020-11-17 14:51:05 +00:00
ActionsDAGPtr ActionsDAG::makeConvertingActions(
const ColumnsWithTypeAndName & source,
const ColumnsWithTypeAndName & result,
MatchColumnsMode mode,
bool ignore_constant_values,
bool add_casted_columns,
NameToNameMap * new_names)
2020-11-17 14:51:05 +00:00
{
size_t num_input_columns = source.size();
size_t num_result_columns = result.size();
if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
2023-01-17 16:39:07 +00:00
throw Exception(ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, "Number of columns doesn't match");
2020-11-17 14:51:05 +00:00
if (add_casted_columns && mode != MatchColumnsMode::Name)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Converting with add_casted_columns supported only for MatchColumnsMode::Name");
2020-11-17 14:51:05 +00:00
auto actions_dag = std::make_shared<ActionsDAG>(source);
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs projection(num_result_columns);
2020-11-17 14:51:05 +00:00
2021-05-15 17:33:15 +00:00
FunctionOverloadResolverPtr func_builder_materialize = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
2020-11-17 19:43:26 +00:00
2020-11-17 14:51:05 +00:00
std::map<std::string_view, std::list<size_t>> inputs;
if (mode == MatchColumnsMode::Name)
{
2022-08-08 15:54:51 +00:00
size_t input_nodes_size = actions_dag->inputs.size();
for (size_t pos = 0; pos < input_nodes_size; ++pos)
2020-11-17 14:51:05 +00:00
inputs[actions_dag->inputs[pos]->result_name].push_back(pos);
}
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result[result_col_num];
2021-03-02 17:08:59 +00:00
const Node * src_node = nullptr;
const Node * dst_node = nullptr;
2020-11-17 14:51:05 +00:00
switch (mode)
{
case MatchColumnsMode::Position:
{
src_node = dst_node = actions_dag->inputs[result_col_num];
2020-11-17 14:51:05 +00:00
break;
}
case MatchColumnsMode::Name:
{
auto & input = inputs[res_elem.name];
if (input.empty())
2021-05-18 15:17:19 +00:00
{
const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get());
if (ignore_constant_values && res_const)
src_node = dst_node = &actions_dag->addColumn(res_elem);
else
2021-08-16 09:08:23 +00:00
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"Cannot find column `{}` in source stream, there are only columns: [{}]",
res_elem.name, Block(source).dumpNames());
2021-05-18 15:17:19 +00:00
}
else
{
src_node = dst_node = actions_dag->inputs[input.front()];
input.pop_front();
}
2020-11-17 14:51:05 +00:00
break;
}
}
/// Check constants.
if (const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
{
if (const auto * src_const = typeid_cast<const ColumnConst *>(dst_node->column.get()))
2020-11-17 14:51:05 +00:00
{
if (ignore_constant_values)
dst_node = &actions_dag->addColumn(res_elem);
2020-11-17 14:51:05 +00:00
else if (res_const->getField() != src_const->getField())
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Cannot convert column `{}` because it is constant but values of constants are different in source and result",
res_elem.name);
2020-11-17 14:51:05 +00:00
}
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Cannot convert column `{}` because it is non constant in source stream but must be constant in result",
res_elem.name);
2020-11-17 14:51:05 +00:00
}
2020-11-20 16:52:50 +00:00
/// Add CAST function to convert into result type if needed.
if (!res_elem.type->equals(*dst_node->result_type))
2020-11-17 14:51:05 +00:00
{
ColumnWithTypeAndName column;
column.name = res_elem.type->getName();
column.column = DataTypeString().createColumnConst(0, column.name);
column.type = std::make_shared<DataTypeString>();
2021-03-02 17:08:59 +00:00
const auto * right_arg = &actions_dag->addColumn(std::move(column));
const auto * left_arg = dst_node;
2020-11-17 14:51:05 +00:00
2021-08-07 08:11:40 +00:00
FunctionCastBase::Diagnostic diagnostic = {dst_node->result_name, res_elem.name};
2022-05-24 04:09:00 +00:00
FunctionOverloadResolverPtr func_builder_cast
= CastInternalOverloadResolver<CastType::nonAccurate>::createImpl(std::move(diagnostic));
2020-11-18 09:35:32 +00:00
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs children = { left_arg, right_arg };
dst_node = &actions_dag->addFunction(func_builder_cast, std::move(children), {});
2020-11-17 14:51:05 +00:00
}
if (dst_node->column && isColumnConst(*dst_node->column) && !(res_elem.column && isColumnConst(*res_elem.column)))
2020-11-17 19:43:26 +00:00
{
NodeRawConstPtrs children = {dst_node};
dst_node = &actions_dag->addFunction(func_builder_materialize, std::move(children), {});
2020-11-17 19:43:26 +00:00
}
if (dst_node->result_name != res_elem.name)
{
if (add_casted_columns)
{
if (inputs.contains(dst_node->result_name))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot convert column `{}` to `{}` because other column have same name",
res_elem.name, dst_node->result_name);
if (new_names)
new_names->emplace(res_elem.name, dst_node->result_name);
2020-11-17 14:51:05 +00:00
/// Leave current column on same place, add converted to back
projection[result_col_num] = src_node;
projection.push_back(dst_node);
}
else
{
dst_node = &actions_dag->addAlias(*dst_node, res_elem.name);
projection[result_col_num] = dst_node;
}
}
else
{
projection[result_col_num] = dst_node;
}
2020-11-17 14:51:05 +00:00
}
2022-08-08 15:54:51 +00:00
actions_dag->outputs.swap(projection);
2021-03-02 17:08:59 +00:00
actions_dag->removeUnusedActions();
2020-11-17 14:51:05 +00:00
actions_dag->projectInput();
return actions_dag;
}
ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
{
auto adding_column_action = std::make_shared<ActionsDAG>();
2022-05-24 04:09:00 +00:00
FunctionOverloadResolverPtr func_builder_materialize
= std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionMaterialize>());
auto column_name = column.name;
2021-03-02 17:08:59 +00:00
const auto * column_node = &adding_column_action->addColumn(std::move(column));
NodeRawConstPtrs inputs = {column_node};
2021-03-11 17:03:39 +00:00
const auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {});
const auto & alias_node = adding_column_action->addAlias(function_node, std::move(column_name));
2022-08-08 15:54:51 +00:00
adding_column_action->outputs.push_back(&alias_node);
return adding_column_action;
}
2020-12-01 11:19:03 +00:00
ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
2020-11-26 16:16:44 +00:00
{
first.mergeInplace(std::move(second));
/// Drop unused inputs and, probably, some actions.
first.removeUnusedActions();
return std::make_shared<ActionsDAG>(std::move(first));
}
void ActionsDAG::mergeInplace(ActionsDAG && second)
{
auto & first = *this;
2020-12-01 11:19:03 +00:00
/// first: x (1), x (2), y ==> x (2), z, x (3)
/// second: x (1), x (2), x (3) ==> x (3), x (2), x (1)
2021-01-18 21:54:01 +00:00
/// merge: x (1), x (2), x (3), y =(first)=> x (2), z, x (4), x (3) =(second)=> x (3), x (4), x (2), z
2020-11-26 16:16:44 +00:00
2020-12-01 11:19:03 +00:00
/// Will store merged result in `first`.
2020-11-26 16:16:44 +00:00
2022-08-08 15:54:51 +00:00
/// This map contains nodes which should be removed from `first` outputs, cause they are used as inputs for `second`.
2021-01-12 18:58:05 +00:00
/// The second element is the number of removes (cause one node may be repeated several times in result).
2021-03-02 17:08:59 +00:00
std::unordered_map<const Node *, size_t> removed_first_result;
2020-12-01 11:19:03 +00:00
/// Map inputs of `second` to nodes of `first`.
2021-03-02 17:08:59 +00:00
std::unordered_map<const Node *, const Node *> inputs_map;
2020-11-26 16:16:44 +00:00
/// Update inputs list.
{
2022-08-08 15:54:51 +00:00
/// Outputs may have multiple columns with same name. They also may be used by `second`. Order is important.
2021-03-02 17:08:59 +00:00
std::unordered_map<std::string_view, std::list<const Node *>> first_result;
2022-08-08 15:54:51 +00:00
for (const auto & output_node : first.outputs)
first_result[output_node->result_name].push_back(output_node);
2020-11-26 16:16:44 +00:00
2022-08-08 15:54:51 +00:00
for (const auto & input_node : second.inputs)
2020-11-26 16:16:44 +00:00
{
2022-08-08 15:54:51 +00:00
auto it = first_result.find(input_node->result_name);
2020-12-01 11:19:03 +00:00
if (it == first_result.end() || it->second.empty())
2020-11-26 16:16:44 +00:00
{
2021-03-02 17:51:54 +00:00
if (first.project_input)
2022-08-19 08:34:15 +00:00
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
2022-08-08 15:54:51 +00:00
"Cannot find column {} in ActionsDAG result", input_node->result_name);
2020-11-26 16:16:44 +00:00
2022-08-08 15:54:51 +00:00
first.inputs.push_back(input_node);
2020-11-26 16:16:44 +00:00
}
else
{
2022-08-08 15:54:51 +00:00
inputs_map[input_node] = it->second.front();
2021-01-12 18:58:05 +00:00
removed_first_result[it->second.front()] += 1;
2020-11-26 16:16:44 +00:00
it->second.pop_front();
}
}
}
2020-12-01 11:19:03 +00:00
/// Replace inputs from `second` to nodes from `first` result.
2021-03-02 17:51:54 +00:00
for (auto & node : second.nodes)
2020-11-26 19:48:21 +00:00
{
2021-03-02 17:51:54 +00:00
for (auto & child : node.children)
2020-11-26 19:48:21 +00:00
{
if (child->type == ActionType::INPUT)
{
auto it = inputs_map.find(child);
if (it != inputs_map.end())
child = it->second;
}
}
}
2022-08-08 15:54:51 +00:00
for (auto & output_node : second.outputs)
2020-11-26 19:48:21 +00:00
{
2022-08-08 15:54:51 +00:00
if (output_node->type == ActionType::INPUT)
2020-11-26 19:48:21 +00:00
{
2022-08-08 15:54:51 +00:00
auto it = inputs_map.find(output_node);
2020-11-26 19:48:21 +00:00
if (it != inputs_map.end())
2022-08-08 15:54:51 +00:00
output_node = it->second;
2020-11-26 19:48:21 +00:00
}
}
2022-08-08 15:54:51 +00:00
/// Update output nodes.
2021-03-02 17:08:59 +00:00
if (second.project_input)
2020-11-26 16:16:44 +00:00
{
2022-08-08 15:54:51 +00:00
first.outputs.swap(second.outputs);
2021-03-02 17:08:59 +00:00
first.project_input = true;
2020-11-26 16:16:44 +00:00
}
else
{
2021-03-02 17:08:59 +00:00
/// Add not removed result from first actions.
2022-08-08 15:54:51 +00:00
for (const auto * output_node : first.outputs)
2020-11-26 16:16:44 +00:00
{
2022-08-08 15:54:51 +00:00
auto it = removed_first_result.find(output_node);
2021-03-02 17:08:59 +00:00
if (it != removed_first_result.end() && it->second > 0)
--it->second;
else
2022-08-08 15:54:51 +00:00
second.outputs.push_back(output_node);
2020-11-26 16:16:44 +00:00
}
2022-08-08 15:54:51 +00:00
first.outputs.swap(second.outputs);
2020-11-26 16:16:44 +00:00
}
2020-12-01 11:19:03 +00:00
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
2020-11-26 16:16:44 +00:00
2021-03-02 17:51:54 +00:00
first.projected_output = second.projected_output;
2020-11-26 16:16:44 +00:00
}
void ActionsDAG::mergeNodes(ActionsDAG && second)
{
std::unordered_map<std::string, const ActionsDAG::Node *> node_name_to_node;
for (auto & node : nodes)
node_name_to_node.emplace(node.result_name, &node);
struct Frame
{
ActionsDAG::Node * node = nullptr;
bool visited_children = false;
};
std::unordered_map<const ActionsDAG::Node *, ActionsDAG::Node *> const_node_to_node;
for (auto & node : second.nodes)
const_node_to_node.emplace(&node, &node);
std::vector<Frame> nodes_to_process;
nodes_to_process.reserve(second.getOutputs().size());
for (auto & node : second.getOutputs())
nodes_to_process.push_back({const_node_to_node.at(node), false /*visited_children*/});
std::unordered_set<const ActionsDAG::Node *> nodes_to_move_from_second_dag;
while (!nodes_to_process.empty())
{
auto & node_to_process = nodes_to_process.back();
auto * node = node_to_process.node;
auto node_it = node_name_to_node.find(node->result_name);
if (node_it != node_name_to_node.end())
{
nodes_to_process.pop_back();
continue;
}
if (!node_to_process.visited_children)
{
node_to_process.visited_children = true;
for (auto & child : node->children)
nodes_to_process.push_back({const_node_to_node.at(child), false /*visited_children*/});
/// If node has children process them first
if (!node->children.empty())
continue;
}
for (auto & child : node->children)
child = node_name_to_node.at(child->result_name);
node_name_to_node.emplace(node->result_name, node);
nodes_to_move_from_second_dag.insert(node);
nodes_to_process.pop_back();
}
if (nodes_to_move_from_second_dag.empty())
return;
auto second_nodes_end = second.nodes.end();
for (auto second_node_it = second.nodes.begin(); second_node_it != second_nodes_end;)
{
if (!nodes_to_move_from_second_dag.contains(&(*second_node_it)))
{
++second_node_it;
continue;
}
auto node_to_move_it = second_node_it;
++second_node_it;
nodes.splice(nodes.end(), second.nodes, node_to_move_it);
if (node_to_move_it->type == ActionType::INPUT)
inputs.push_back(&(*node_to_move_it));
}
}
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
2021-01-18 14:59:59 +00:00
{
/// Split DAG into two parts.
2022-08-08 15:54:51 +00:00
/// (first_nodes, first_outputs) is a part which will have split_list in result.
/// (second_nodes, second_outputs) is a part which will have same outputs as current actions.
2021-03-02 17:08:59 +00:00
Nodes first_nodes;
2022-08-08 15:54:51 +00:00
NodeRawConstPtrs first_outputs;
Nodes second_nodes;
NodeRawConstPtrs second_outputs;
2021-01-18 14:59:59 +00:00
/// List of nodes from current actions which are not inputs, but will be in second part.
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs new_inputs;
2021-01-18 14:59:59 +00:00
struct Frame
{
2021-04-22 19:50:48 +00:00
const Node * node = nullptr;
2021-01-18 14:59:59 +00:00
size_t next_child_to_visit = 0;
};
struct Data
{
bool needed_by_split_node = false;
bool visited = false;
bool used_in_result = false;
/// Copies of node in one of the DAGs.
/// For COLUMN and INPUT both copies may exist.
Node * to_second = nullptr;
Node * to_first = nullptr;
};
std::stack<Frame> stack;
std::unordered_map<const Node *, Data> data;
2022-08-08 15:54:51 +00:00
for (const auto & output_node : outputs)
data[output_node].used_in_result = true;
2021-01-18 14:59:59 +00:00
/// DFS. Decide if node is needed by split.
for (const auto & node : nodes)
{
if (!split_nodes.contains(&node))
2021-01-18 14:59:59 +00:00
continue;
auto & cur_data = data[&node];
if (cur_data.needed_by_split_node)
continue;
cur_data.needed_by_split_node = true;
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur_node = stack.top().node;
stack.pop();
for (const auto * child : cur_node->children)
{
auto & child_data = data[child];
if (!child_data.needed_by_split_node)
{
child_data.needed_by_split_node = true;
stack.push({.node = child});
}
}
}
}
/// DFS. Move nodes to one of the DAGs.
for (const auto & node : nodes)
{
if (!data[&node].visited)
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur = stack.top();
auto & cur_data = data[cur.node];
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
2021-03-11 17:03:39 +00:00
const auto * child = cur.node->children[cur.next_child_to_visit];
2021-01-18 14:59:59 +00:00
auto & child_data = data[child];
if (!child_data.visited)
{
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
/// Make a copy part.
if (cur.next_child_to_visit == cur.node->children.size())
{
cur_data.visited = true;
stack.pop();
if (!cur_data.needed_by_split_node)
{
auto & copy = second_nodes.emplace_back(*cur.node);
cur_data.to_second = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
auto & child_data = data[child];
2021-01-22 13:46:56 +00:00
/// If children is not created, it may be from split part.
2021-01-18 14:59:59 +00:00
if (!child_data.to_second)
{
if (child->type == ActionType::COLUMN) /// Just create new node for COLUMN action.
{
child_data.to_second = &second_nodes.emplace_back(*child);
}
else
{
/// Node from first part is added as new input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = child->result_type;
input_node.result_name = child->result_name;
child_data.to_second = &second_nodes.emplace_back(std::move(input_node));
new_inputs.push_back(child);
}
}
child = child_data.to_second;
}
2021-01-18 20:52:33 +00:00
/// Input from second DAG should also be in the first.
if (copy.type == ActionType::INPUT)
{
auto & input_copy = first_nodes.emplace_back(*cur.node);
assert(cur_data.to_first == nullptr);
cur_data.to_first = &input_copy;
new_inputs.push_back(cur.node);
}
2021-01-18 14:59:59 +00:00
}
else
{
auto & copy = first_nodes.emplace_back(*cur.node);
cur_data.to_first = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
child = data[child].to_first;
assert(child != nullptr);
}
2021-01-18 20:34:46 +00:00
if (cur_data.used_in_result)
2021-01-18 14:59:59 +00:00
{
/// If this node is needed in result, add it as input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = node.result_type;
input_node.result_name = node.result_name;
cur_data.to_second = &second_nodes.emplace_back(std::move(input_node));
2021-01-18 20:34:46 +00:00
new_inputs.push_back(cur.node);
2021-01-18 14:59:59 +00:00
}
}
}
}
}
2022-08-08 15:54:51 +00:00
for (const auto * output_node : outputs)
second_outputs.push_back(data[output_node].to_second);
2021-01-18 14:59:59 +00:00
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs second_inputs;
NodeRawConstPtrs first_inputs;
2021-01-18 14:59:59 +00:00
2022-08-08 15:54:51 +00:00
for (const auto * input_node : inputs)
2021-01-18 14:59:59 +00:00
{
2022-08-08 15:54:51 +00:00
const auto & cur = data[input_node];
2021-01-18 14:59:59 +00:00
first_inputs.push_back(cur.to_first);
}
for (const auto * input : new_inputs)
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
2022-08-08 15:54:51 +00:00
first_outputs.push_back(cur.to_first);
2021-01-18 14:59:59 +00:00
}
2021-03-02 17:08:59 +00:00
auto first_actions = std::make_shared<ActionsDAG>();
2021-01-18 14:59:59 +00:00
first_actions->nodes.swap(first_nodes);
2022-08-08 15:54:51 +00:00
first_actions->outputs.swap(first_outputs);
2021-01-18 14:59:59 +00:00
first_actions->inputs.swap(first_inputs);
2021-03-02 17:08:59 +00:00
auto second_actions = std::make_shared<ActionsDAG>();
2021-01-18 14:59:59 +00:00
second_actions->nodes.swap(second_nodes);
2022-08-08 15:54:51 +00:00
second_actions->outputs.swap(second_outputs);
2021-01-18 14:59:59 +00:00
second_actions->inputs.swap(second_inputs);
return {std::move(first_actions), std::move(second_actions)};
}
ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const
2020-11-16 14:57:56 +00:00
{
struct Frame
{
2021-05-07 19:57:35 +00:00
const Node * node = nullptr;
2020-11-16 14:57:56 +00:00
size_t next_child_to_visit = 0;
};
2021-01-18 16:12:16 +00:00
std::unordered_set<const Node *> split_nodes;
std::unordered_set<const Node *> visited_nodes;
2020-11-16 14:57:56 +00:00
std::stack<Frame> stack;
2021-01-18 16:12:16 +00:00
/// DFS. Decide if node depends on ARRAY JOIN.
for (const auto & node : nodes)
2020-11-16 14:57:56 +00:00
{
if (visited_nodes.contains(&node))
2021-01-18 16:12:16 +00:00
continue;
visited_nodes.insert(&node);
stack.push({.node = &node});
2020-11-16 14:57:56 +00:00
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children. We depend on ARRAY JOIN if any child does.
while (cur.next_child_to_visit < cur.node->children.size())
{
2021-03-02 17:08:59 +00:00
const auto * child = cur.node->children[cur.next_child_to_visit];
2020-11-16 14:57:56 +00:00
if (!visited_nodes.contains(child))
2020-11-16 14:57:56 +00:00
{
2021-01-18 16:12:16 +00:00
visited_nodes.insert(child);
2020-11-16 14:57:56 +00:00
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
2021-01-18 16:12:16 +00:00
bool depend_on_array_join = false;
if (cur.node->type == ActionType::INPUT && array_joined_columns.contains(cur.node->result_name))
2021-01-18 16:12:16 +00:00
depend_on_array_join = true;
2020-11-16 14:57:56 +00:00
2021-01-18 16:12:16 +00:00
for (const auto * child : cur.node->children)
2020-11-16 14:57:56 +00:00
{
if (!split_nodes.contains(child))
2021-01-18 16:12:16 +00:00
depend_on_array_join = true;
2020-11-16 14:57:56 +00:00
}
2021-01-18 16:12:16 +00:00
if (!depend_on_array_join)
split_nodes.insert(cur.node);
2020-11-16 14:57:56 +00:00
2021-01-18 16:12:16 +00:00
stack.pop();
2020-11-16 14:57:56 +00:00
}
}
}
2021-01-18 19:56:34 +00:00
auto res = split(split_nodes);
res.second->project_input = project_input;
2021-01-18 19:56:34 +00:00
return res;
2020-11-16 14:57:56 +00:00
}
ActionsDAG::NodeRawConstPtrs ActionsDAG::getParents(const Node * target) const
{
NodeRawConstPtrs parents;
for (const auto & node : getNodes())
{
for (const auto & child : node.children)
{
if (child == target)
{
parents.push_back(&node);
break;
}
}
}
return parents;
}
2022-03-25 16:20:29 +00:00
ActionsDAG::SplitResult ActionsDAG::splitActionsBySortingDescription(const NameSet & sort_columns) const
2022-03-25 15:18:24 +00:00
{
std::unordered_set<const Node *> split_nodes;
2022-03-25 16:20:29 +00:00
for (const auto & sort_column : sort_columns)
2022-08-08 15:54:51 +00:00
if (const auto * node = tryFindInOutputs(sort_column))
{
2022-03-25 15:18:24 +00:00
split_nodes.insert(node);
/// Sorting can materialize const columns, so if we have const expression used in sorting,
/// we should also add all it's parents, otherwise, we can break the header
/// (function can expect const column, but will get materialized).
if (node->column && isColumnConst(*node->column))
{
auto parents = getParents(node);
split_nodes.insert(parents.begin(), parents.end());
}
}
2022-03-31 11:39:05 +00:00
else
2022-08-08 15:54:51 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sorting column {} wasn't found in the ActionsDAG's outputs. DAG:\n{}",
sort_column,
dumpDAG());
2022-03-31 11:39:05 +00:00
2022-03-25 15:18:24 +00:00
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
}
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
2021-01-19 10:03:25 +00:00
{
2022-08-08 15:54:51 +00:00
const auto * node = tryFindInOutputs(column_name);
2021-03-11 17:03:39 +00:00
if (!node)
2021-01-19 10:03:25 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR,
2022-08-08 15:54:51 +00:00
"Outputs for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name,
dumpDAG());
2021-01-19 10:03:25 +00:00
2021-03-11 17:03:39 +00:00
std::unordered_set<const Node *> split_nodes = {node};
2021-05-24 18:09:00 +00:00
auto res = split(split_nodes);
res.second->project_input = project_input;
return res;
2021-01-19 10:03:25 +00:00
}
namespace
2021-02-10 16:26:49 +00:00
{
struct ConjunctionNodes
{
2021-03-02 17:51:54 +00:00
ActionsDAG::NodeRawConstPtrs allowed;
ActionsDAG::NodeRawConstPtrs rejected;
};
/// Take a node which result is predicate.
/// Assuming predicate is a conjunction (probably, trivial).
/// Find separate conjunctions nodes. Split nodes into allowed and rejected sets.
/// Allowed predicate is a predicate which can be calculated using only nodes from allowed_nodes set.
ConjunctionNodes getConjunctionNodes(ActionsDAG::Node * predicate, std::unordered_set<const ActionsDAG::Node *> allowed_nodes)
{
ConjunctionNodes conjunction;
2021-03-02 17:51:54 +00:00
std::unordered_set<const ActionsDAG::Node *> allowed;
std::unordered_set<const ActionsDAG::Node *> rejected;
/// Parts of predicate in case predicate is conjunction (or just predicate itself).
std::unordered_set<const ActionsDAG::Node *> predicates;
{
std::stack<const ActionsDAG::Node *> stack;
std::unordered_set<const ActionsDAG::Node *> visited_nodes;
stack.push(predicate);
visited_nodes.insert(predicate);
while (!stack.empty())
{
const auto * node = stack.top();
stack.pop();
bool is_conjunction = node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and";
if (is_conjunction)
{
for (const auto & child : node->children)
{
if (!visited_nodes.contains(child))
{
visited_nodes.insert(child);
stack.push(child);
}
}
}
else
predicates.insert(node);
}
}
struct Frame
2021-02-10 16:26:49 +00:00
{
2021-05-07 19:57:35 +00:00
const ActionsDAG::Node * node = nullptr;
size_t next_child_to_visit = 0;
size_t num_allowed_children = 0;
};
std::stack<Frame> stack;
2021-03-02 17:08:59 +00:00
std::unordered_set<const ActionsDAG::Node *> visited_nodes;
2022-05-24 04:09:00 +00:00
stack.push({.node = predicate});
visited_nodes.insert(predicate);
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
2021-03-02 17:08:59 +00:00
const auto * child = cur.node->children[cur.next_child_to_visit];
if (!visited_nodes.contains(child))
{
visited_nodes.insert(child);
stack.push({.node = child});
break;
}
if (allowed_nodes.contains(child))
++cur.num_allowed_children;
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
if (cur.num_allowed_children == cur.node->children.size())
{
if (cur.node->type != ActionsDAG::ActionType::ARRAY_JOIN && cur.node->type != ActionsDAG::ActionType::INPUT)
allowed_nodes.emplace(cur.node);
}
if (predicates.contains(cur.node))
{
if (allowed_nodes.contains(cur.node))
2021-02-20 17:42:06 +00:00
{
if (allowed.insert(cur.node).second)
conjunction.allowed.push_back(cur.node);
2021-02-20 17:42:06 +00:00
}
else
{
if (rejected.insert(cur.node).second)
conjunction.rejected.push_back(cur.node);
}
}
stack.pop();
}
}
2021-02-10 16:26:49 +00:00
// std::cerr << "Allowed " << conjunction.allowed.size() << std::endl;
// for (const auto & node : conjunction.allowed)
// std::cerr << node->result_name << std::endl;
// std::cerr << "Rejected " << conjunction.rejected.size() << std::endl;
// for (const auto & node : conjunction.rejected)
// std::cerr << node->result_name << std::endl;
return conjunction;
}
2021-02-10 16:26:49 +00:00
2021-03-02 17:51:54 +00:00
ColumnsWithTypeAndName prepareFunctionArguments(const ActionsDAG::NodeRawConstPtrs & nodes)
{
ColumnsWithTypeAndName arguments;
arguments.reserve(nodes.size());
2021-02-10 16:26:49 +00:00
for (const auto * child : nodes)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
2021-02-10 16:26:49 +00:00
arguments.emplace_back(std::move(argument));
}
return arguments;
}
}
/// Create actions which calculate conjunction of selected nodes.
/// Assume conjunction nodes are predicates (and may be used as arguments of function AND).
///
2022-08-08 15:54:51 +00:00
/// Result actions add single column with conjunction result (it is always first in outputs).
/// No other columns are added or removed.
2021-03-11 10:34:15 +00:00
ActionsDAGPtr ActionsDAG::cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs)
{
if (conjunction.empty())
return nullptr;
2021-03-02 17:08:59 +00:00
auto actions = std::make_shared<ActionsDAG>();
2021-05-15 17:33:15 +00:00
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
2021-03-02 17:08:59 +00:00
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> nodes_mapping;
std::unordered_map<std::string, std::list<const Node *>> required_inputs;
struct Frame
2021-02-10 16:26:49 +00:00
{
2021-05-07 19:57:35 +00:00
const ActionsDAG::Node * node = nullptr;
size_t next_child_to_visit = 0;
};
2021-02-10 16:26:49 +00:00
std::stack<Frame> stack;
2021-02-10 16:26:49 +00:00
/// DFS. Clone actions.
for (const auto * predicate : conjunction)
{
if (nodes_mapping.contains(predicate))
continue;
stack.push({.node = predicate});
2021-02-10 16:26:49 +00:00
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
2021-03-02 17:08:59 +00:00
const auto * child = cur.node->children[cur.next_child_to_visit];
2021-02-10 16:26:49 +00:00
if (!nodes_mapping.contains(child))
2021-02-10 16:26:49 +00:00
{
stack.push({.node = child});
2021-02-10 16:26:49 +00:00
break;
}
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
auto & node = actions->nodes.emplace_back(*cur.node);
nodes_mapping[cur.node] = &node;
for (auto & child : node.children)
child = nodes_mapping[child];
if (node.type == ActionType::INPUT)
2021-03-17 16:08:46 +00:00
required_inputs[node.result_name].push_back(&node);
2021-02-10 16:26:49 +00:00
stack.pop();
}
}
}
const Node * result_predicate = nodes_mapping[*conjunction.begin()];
if (conjunction.size() > 1)
{
NodeRawConstPtrs args;
args.reserve(conjunction.size());
for (const auto * predicate : conjunction)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, std::move(args), {});
}
2022-08-08 15:54:51 +00:00
actions->outputs.push_back(result_predicate);
2021-03-11 10:34:15 +00:00
for (const auto & col : all_inputs)
{
const Node * input;
2021-03-17 16:08:46 +00:00
auto & list = required_inputs[col.name];
2021-03-11 10:34:15 +00:00
if (list.empty())
input = &actions->addInput(col);
else
{
input = list.front();
list.pop_front();
actions->inputs.push_back(input);
}
2022-08-08 15:54:51 +00:00
/// We should not add result_predicate into the outputs for the second time.
if (input->result_name != result_predicate->result_name)
2022-08-08 15:54:51 +00:00
actions->outputs.push_back(input);
2021-03-11 10:34:15 +00:00
}
return actions;
}
2021-02-17 10:27:47 +00:00
2021-03-17 16:08:46 +00:00
ActionsDAGPtr ActionsDAG::cloneActionsForFilterPushDown(
const std::string & filter_name,
bool can_remove_filter,
const Names & available_inputs,
const ColumnsWithTypeAndName & all_inputs)
{
2022-08-08 15:54:51 +00:00
Node * predicate = const_cast<Node *>(tryFindInOutputs(filter_name));
2021-03-11 17:03:39 +00:00
if (!predicate)
2022-08-08 15:54:51 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Output nodes for ActionsDAG do not contain filter column name {}. DAG:\n{}",
filter_name,
dumpDAG());
2021-02-10 17:47:48 +00:00
2021-04-08 15:00:32 +00:00
/// If condition is constant let's do nothing.
/// It means there is nothing to push down or optimization was already applied.
if (predicate->type == ActionType::COLUMN)
return nullptr;
std::unordered_set<const Node *> allowed_nodes;
2021-02-10 16:26:49 +00:00
/// Get input nodes from available_inputs names.
{
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
2022-08-08 15:54:51 +00:00
for (const auto & input_node : inputs)
inputs_map[input_node->result_name].emplace_back(input_node);
2021-02-10 16:26:49 +00:00
for (const auto & name : available_inputs)
2021-02-10 16:26:49 +00:00
{
auto & inputs_list = inputs_map[name];
if (inputs_list.empty())
2021-02-10 16:26:49 +00:00
continue;
allowed_nodes.emplace(inputs_list.front());
inputs_list.pop_front();
2021-02-10 16:26:49 +00:00
}
}
2021-02-10 16:26:49 +00:00
auto conjunction = getConjunctionNodes(predicate, allowed_nodes);
if (conjunction.rejected.size() == 1 && !conjunction.rejected.front()->result_type->equals(*predicate->result_type)
&& conjunction.allowed.front()->type == ActionType::COLUMN)
{
// No further optimization can be done
return nullptr;
}
2021-03-11 10:34:15 +00:00
auto actions = cloneActionsForConjunction(conjunction.allowed, all_inputs);
if (!actions)
return nullptr;
2021-02-10 16:26:49 +00:00
/// Now, when actions are created, update current DAG.
2021-02-10 16:26:49 +00:00
2021-02-20 17:42:06 +00:00
if (conjunction.rejected.empty())
2021-02-10 16:26:49 +00:00
{
2021-02-10 17:47:48 +00:00
/// The whole predicate was split.
if (can_remove_filter)
{
2022-08-08 15:54:51 +00:00
/// If filter column is not needed, remove it from output nodes.
std::erase_if(outputs, [&](const Node * node) { return node == predicate; });
/// At the very end of this method we'll call removeUnusedActions() with allow_remove_inputs=false,
/// so we need to manually remove predicate if it is an input node.
if (predicate->type == ActionType::INPUT)
2021-02-10 17:47:48 +00:00
{
std::erase_if(inputs, [&](const Node * node) { return node == predicate; });
nodes.remove_if([&](const Node & node) { return &node == predicate; });
2021-02-10 17:47:48 +00:00
}
}
else
{
/// Replace predicate result to constant 1.
2021-02-10 17:47:48 +00:00
Node node;
node.type = ActionType::COLUMN;
node.result_name = std::move(predicate->result_name);
node.result_type = std::move(predicate->result_type);
2021-02-10 17:47:48 +00:00
node.column = node.result_type->createColumnConst(0, 1);
2021-04-08 15:00:32 +00:00
if (predicate->type != ActionType::INPUT)
*predicate = std::move(node);
else
{
/// Special case. We cannot replace input to constant inplace.
/// Because we cannot affect inputs list for actions.
2022-08-08 15:54:51 +00:00
/// So we just add a new constant and update outputs.
2021-04-08 15:00:32 +00:00
const auto * new_predicate = &addNode(node);
2022-08-08 15:54:51 +00:00
for (auto & output_node : outputs)
if (output_node == predicate)
output_node = new_predicate;
2021-04-08 15:00:32 +00:00
}
2021-02-10 17:47:48 +00:00
}
2021-02-10 16:26:49 +00:00
}
else
2021-02-10 17:47:48 +00:00
{
/// Predicate is conjunction, where both allowed and rejected sets are not empty.
2021-03-02 17:08:59 +00:00
NodeRawConstPtrs new_children = std::move(conjunction.rejected);
2021-02-10 16:26:49 +00:00
if (new_children.size() == 1 && new_children.front()->result_type->equals(*predicate->result_type))
2021-02-10 17:47:48 +00:00
{
/// Rejected set has only one predicate. And the type is the same as the result_type.
/// Just add alias.
Node node;
node.type = ActionType::ALIAS;
node.result_name = predicate->result_name;
node.result_type = predicate->result_type;
node.children.swap(new_children);
*predicate = std::move(node);
2021-02-10 17:47:48 +00:00
}
else
{
/// Predicate is function AND, which still have more then one argument
/// or it has one argument of the wrong type.
/// Just update children and rebuild it.
if (new_children.size() == 1)
{
Node node;
node.type = ActionType::COLUMN;
node.result_name = "1";
node.column = DataTypeUInt8().createColumnConst(0, 1u);
node.result_type = std::make_shared<DataTypeUInt8>();
const auto * const_col = &nodes.emplace_back(std::move(node));
new_children.emplace_back(const_col);
}
predicate->children.swap(new_children);
auto arguments = prepareFunctionArguments(predicate->children);
2021-02-10 17:47:48 +00:00
2022-11-28 15:02:59 +00:00
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
predicate->function_base = func_builder_and->build(arguments);
predicate->function = predicate->function_base->prepare(arguments);
2021-02-10 17:47:48 +00:00
}
}
2021-02-10 16:26:49 +00:00
2022-05-24 04:09:00 +00:00
removeUnusedActions(false);
2021-02-10 16:26:49 +00:00
return actions;
}
2022-08-16 14:59:53 +00:00
static bool isColumnSortingPreserved(const ActionsDAG::Node * start_node, const String & sorted_column)
{
2022-08-16 14:59:53 +00:00
/// only function node can several children
/// but we support monotonicity check only for functions with one argument
/// so, currently we consider just first child - it covers majority of cases
/// TODO: if one parameter is variable and other are constant then we can try to check monotonicity as well
/// first find the column
const ActionsDAG::Node * node = start_node;
bool found = false;
while (node)
{
/// if column found
if (node->type == ActionsDAG::ActionType::INPUT && node->result_name == sorted_column)
{
found = true;
break;
}
if (node->children.empty())
break; /// column not found
node = node->children.front();
}
if (!found)
return false;
/// if column found, check if sorting is preserved
const Field field{};
2022-08-16 14:59:53 +00:00
node = start_node;
while (node)
{
if (node->type == ActionsDAG::ActionType::FUNCTION)
{
auto func = node->function_base;
if (func)
{
if (!func->hasInformationAboutMonotonicity())
return false;
const auto & types = func->getArgumentTypes();
if (types.empty())
return false;
const auto monotonicity = func->getMonotonicityForRange(*types.front(), field, field);
if (!monotonicity.is_always_monotonic)
return false;
}
}
if (node->children.empty())
break;
node = node->children.front();
}
return true;
}
bool ActionsDAG::isSortingPreserved(
const Block & input_header, const SortDescription & sort_description, const String & ignore_output_column) const
{
if (sort_description.empty())
return true;
if (hasArrayJoin())
return false;
const Block & output_header = updateHeader(input_header);
for (const auto & desc : sort_description)
{
/// header contains column with the same name
if (output_header.findByName(desc.column_name))
{
/// find the corresponding node in output
const auto * output_node = tryFindInOutputs(desc.column_name);
if (!output_node)
{
/// sorted column name in header but NOT in expression output -> no expression is applied to it -> sorting preserved
continue;
}
}
/// check if any output node is related to the sorted column and sorting order is preserved
bool preserved = false;
for (const auto * output_node : outputs)
{
if (output_node->result_name == ignore_output_column)
continue;
2022-08-16 14:59:53 +00:00
if (isColumnSortingPreserved(output_node, desc.column_name))
{
2022-08-16 14:59:53 +00:00
preserved = true;
break;
}
}
if (!preserved)
return false;
}
return true;
}
2022-11-17 18:44:26 +00:00
ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
const NodeRawConstPtrs & filter_nodes,
const std::unordered_map<std::string, ColumnWithTypeAndName> & node_name_to_input_node_column,
const ContextPtr & context,
bool single_output_condition_node)
2022-11-17 18:44:26 +00:00
{
2022-11-17 19:32:13 +00:00
if (filter_nodes.empty())
2022-11-17 18:44:26 +00:00
return nullptr;
struct Frame
{
const ActionsDAG::Node * node = nullptr;
bool visited_children = false;
};
auto result_dag = std::make_shared<ActionsDAG>();
std::unordered_map<const ActionsDAG::Node *, const ActionsDAG::Node *> node_to_result_node;
size_t filter_nodes_size = filter_nodes.size();
std::vector<Frame> nodes_to_process;
nodes_to_process.reserve(filter_nodes_size);
for (const auto & node : filter_nodes)
nodes_to_process.push_back({node, false /*visited_children*/});
while (!nodes_to_process.empty())
{
auto & node_to_process = nodes_to_process.back();
const auto * node = node_to_process.node;
/// Already visited node
if (node_to_result_node.contains(node))
{
nodes_to_process.pop_back();
continue;
}
const ActionsDAG::Node * result_node = nullptr;
auto input_node_it = node_name_to_input_node_column.find(node->result_name);
if (input_node_it != node_name_to_input_node_column.end())
{
result_node = &result_dag->addInput(input_node_it->second);
node_to_result_node.emplace(node, result_node);
nodes_to_process.pop_back();
continue;
}
if (!node_to_process.visited_children)
{
node_to_process.visited_children = true;
for (const auto & child : node->children)
nodes_to_process.push_back({child, false /*visited_children*/});
/// If node has children process them first
if (!node->children.empty())
continue;
}
auto node_type = node->type;
switch (node_type)
{
case ActionsDAG::ActionType::INPUT:
{
result_node = &result_dag->addInput({node->column, node->result_type, node->result_name});
break;
}
case ActionsDAG::ActionType::COLUMN:
{
result_node = &result_dag->addColumn({node->column, node->result_type, node->result_name});
break;
}
case ActionsDAG::ActionType::ALIAS:
{
const auto * child = node->children.front();
result_node = &result_dag->addAlias(*(node_to_result_node.find(child)->second), node->result_name);
break;
}
case ActionsDAG::ActionType::ARRAY_JOIN:
{
const auto * child = node->children.front();
result_node = &result_dag->addArrayJoin(*(node_to_result_node.find(child)->second), {});
break;
}
case ActionsDAG::ActionType::FUNCTION:
{
NodeRawConstPtrs function_children;
function_children.reserve(node->children.size());
FunctionOverloadResolverPtr function_overload_resolver;
if (node->function_base->getName() == "indexHint")
{
ActionsDAG::NodeRawConstPtrs children;
if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node->function_base.get()))
{
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
{
auto index_hint_filter_dag = buildFilterActionsDAG(index_hint->getActions()->getOutputs(),
node_name_to_input_node_column,
context,
false /*single_output_condition_node*/);
auto index_hint_function_clone = std::make_shared<FunctionIndexHint>();
index_hint_function_clone->setActions(std::move(index_hint_filter_dag));
function_overload_resolver = std::make_shared<FunctionToOverloadResolverAdaptor>(std::move(index_hint_function_clone));
}
}
}
2022-11-17 18:44:26 +00:00
for (const auto & child : node->children)
function_children.push_back(node_to_result_node.find(child)->second);
2023-02-01 13:33:32 +00:00
auto [arguments, all_const] = getFunctionArguments(function_children);
auto function_base = function_overload_resolver ? function_overload_resolver->build(arguments) : node->function_base;
2023-02-01 13:33:32 +00:00
result_node = &result_dag->addFunctionImpl(
function_base,
2023-02-01 13:33:32 +00:00
std::move(function_children),
std::move(arguments),
{},
node->result_type,
all_const);
2022-11-17 18:44:26 +00:00
break;
}
}
node_to_result_node.emplace(node, result_node);
nodes_to_process.pop_back();
}
auto & result_dag_outputs = result_dag->getOutputs();
result_dag_outputs.reserve(filter_nodes_size);
for (const auto & node : filter_nodes)
result_dag_outputs.push_back(node_to_result_node.find(node)->second);
if (result_dag_outputs.size() > 1 && single_output_condition_node)
2022-11-17 18:44:26 +00:00
{
auto function_builder = FunctionFactory::instance().get("and", context);
2022-12-08 16:30:03 +00:00
result_dag_outputs = { &result_dag->addFunction(function_builder, result_dag_outputs, {}) };
2022-11-17 18:44:26 +00:00
}
return result_dag;
}
FindOriginalNodeForOutputName::FindOriginalNodeForOutputName(const ActionsDAGPtr & actions_)
:actions(actions_)
2023-03-07 22:16:22 +00:00
{
const auto & actions_outputs = actions->getOutputs();
for (const auto * output_node : actions_outputs)
{
/// find input node which refers to the output node
/// consider only aliases on the path
const auto * node = output_node;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
/// alias has only one child
chassert(node->children.size() == 1);
node = node->children.front();
}
if (node && node->type == ActionsDAG::ActionType::INPUT)
index.emplace(output_node->result_name, node);
}
}
const ActionsDAG::Node * FindOriginalNodeForOutputName::find(const String & output_name)
{
const auto it = index.find(output_name);
if (it == index.end())
2023-03-07 22:16:22 +00:00
return nullptr;
return it->second;
2023-03-07 22:16:22 +00:00
}
FindAliasForInputName::FindAliasForInputName(const ActionsDAGPtr & actions_)
:actions(actions_)
{
const auto & actions_outputs = actions->getOutputs();
for (const auto * output_node : actions_outputs)
2023-03-07 22:16:22 +00:00
{
/// find input node which corresponds to alias
const auto * node = output_node;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
/// alias has only one child
chassert(node->children.size() == 1);
node = node->children.front();
}
if (node && node->type == ActionsDAG::ActionType::INPUT)
/// node can have several aliases but we consider only the first one
index.emplace(node->result_name, output_node);
2023-03-07 22:16:22 +00:00
}
}
const ActionsDAG::Node * FindAliasForInputName::find(const String & name)
{
const auto it = index.find(name);
if (it == index.end())
2023-03-07 22:16:22 +00:00
return nullptr;
return it->second;
2023-03-07 22:16:22 +00:00
}
2020-11-16 14:57:56 +00:00
}