ClickHouse/src/Interpreters/ActionsDAG.cpp

1438 lines
45 KiB
C++
Raw Normal View History

2020-11-16 14:57:56 +00:00
#include <Interpreters/ActionsDAG.h>
#include <DataTypes/DataTypeArray.h>
2020-11-17 14:51:05 +00:00
#include <DataTypes/DataTypeString.h>
2020-11-16 14:57:56 +00:00
#include <Functions/IFunction.h>
2020-11-17 14:51:05 +00:00
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsConversion.h>
2020-11-17 19:43:26 +00:00
#include <Functions/materialize.h>
2021-02-10 16:26:49 +00:00
#include <Functions/FunctionsLogical.h>
2020-11-16 14:57:56 +00:00
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionJIT.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <stack>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DUPLICATE_COLUMN;
extern const int UNKNOWN_IDENTIFIER;
extern const int TYPE_MISMATCH;
2020-11-17 14:51:05 +00:00
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int THERE_IS_NO_COLUMN;
2020-11-17 16:24:25 +00:00
extern const int ILLEGAL_COLUMN;
2020-11-16 14:57:56 +00:00
}
2020-11-17 12:39:41 +00:00
ActionsDAG::ActionsDAG(const NamesAndTypesList & inputs_)
2020-11-16 14:57:56 +00:00
{
2020-11-17 12:39:41 +00:00
for (const auto & input : inputs_)
2020-11-17 12:34:31 +00:00
addInput(input.name, input.type, true);
2020-11-16 14:57:56 +00:00
}
2020-11-17 12:39:41 +00:00
ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
2020-11-16 14:57:56 +00:00
{
2020-11-17 12:39:41 +00:00
for (const auto & input : inputs_)
2020-11-16 14:57:56 +00:00
{
if (input.column && isColumnConst(*input.column))
2020-11-17 12:34:31 +00:00
addInput(input, true);
2020-11-16 14:57:56 +00:00
else
2020-11-17 12:34:31 +00:00
addInput(input.name, input.type, true);
2020-11-16 14:57:56 +00:00
}
}
ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace)
{
auto it = index.find(node.result_name);
if (it != index.end() && !can_replace)
throw Exception("Column '" + node.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
auto & res = nodes.emplace_back(std::move(node));
2020-11-17 12:34:31 +00:00
if (res.type == ActionType::INPUT)
inputs.emplace_back(&res);
2020-11-16 14:57:56 +00:00
index.replace(&res);
return res;
}
ActionsDAG::Node & ActionsDAG::getNode(const std::string & name)
{
auto it = index.find(name);
if (it == index.end())
throw Exception("Unknown identifier: '" + name + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
return **it;
}
2020-11-17 12:34:31 +00:00
const ActionsDAG::Node & ActionsDAG::addInput(std::string name, DataTypePtr type, bool can_replace)
2020-11-16 14:57:56 +00:00
{
Node node;
node.type = ActionType::INPUT;
node.result_type = std::move(type);
node.result_name = std::move(name);
2020-11-17 12:34:31 +00:00
return addNode(std::move(node), can_replace);
2020-11-16 14:57:56 +00:00
}
2020-11-17 12:34:31 +00:00
const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool can_replace)
2020-11-16 14:57:56 +00:00
{
Node node;
node.type = ActionType::INPUT;
node.result_type = std::move(column.type);
node.result_name = std::move(column.name);
node.column = std::move(column.column);
2020-11-17 12:34:31 +00:00
return addNode(std::move(node), can_replace);
2020-11-16 14:57:56 +00:00
}
2020-11-17 14:51:05 +00:00
const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace)
2020-11-16 14:57:56 +00:00
{
if (!column.column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add column {} because it is nullptr", column.name);
Node node;
node.type = ActionType::COLUMN;
node.result_type = std::move(column.type);
node.result_name = std::move(column.name);
node.column = std::move(column.column);
2020-11-17 14:51:05 +00:00
return addNode(std::move(node), can_replace);
2020-11-16 14:57:56 +00:00
}
const ActionsDAG::Node & ActionsDAG::addAlias(const std::string & name, std::string alias, bool can_replace)
{
2020-11-18 09:08:51 +00:00
return addAlias(getNode(name), alias, can_replace);
}
2020-11-16 14:57:56 +00:00
2020-11-18 09:08:51 +00:00
ActionsDAG::Node & ActionsDAG::addAlias(Node & child, std::string alias, bool can_replace)
{
2020-11-16 14:57:56 +00:00
Node node;
node.type = ActionType::ALIAS;
node.result_type = child.result_type;
node.result_name = std::move(alias);
node.column = child.column;
node.allow_constant_folding = child.allow_constant_folding;
node.children.emplace_back(&child);
return addNode(std::move(node), can_replace);
}
const ActionsDAG::Node & ActionsDAG::addArrayJoin(const std::string & source_name, std::string result_name)
{
auto & child = getNode(source_name);
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(child.result_type.get());
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
Node node;
node.type = ActionType::ARRAY_JOIN;
node.result_type = array_type->getNestedType();
node.result_name = std::move(result_name);
node.children.emplace_back(&child);
return addNode(std::move(node));
}
const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionOverloadResolverPtr & function,
const Names & argument_names,
std::string result_name,
const Context & context [[maybe_unused]])
{
const auto & all_settings = context.getSettingsRef();
settings.max_temporary_columns = all_settings.max_temporary_columns;
settings.max_temporary_non_const_columns = all_settings.max_temporary_non_const_columns;
#if USE_EMBEDDED_COMPILER
settings.compile_expressions = all_settings.compile_expressions;
settings.min_count_to_compile_expression = all_settings.min_count_to_compile_expression;
if (!compilation_cache)
compilation_cache = context.getCompiledExpressionCache();
#endif
2020-11-17 14:51:05 +00:00
Inputs children;
children.reserve(argument_names.size());
for (const auto & name : argument_names)
children.push_back(&getNode(name));
2020-11-17 14:54:37 +00:00
return addFunction(function, children, std::move(result_name), false);
2020-11-17 12:34:31 +00:00
}
2020-11-17 14:51:05 +00:00
ActionsDAG::Node & ActionsDAG::addFunction(
2020-11-17 12:34:31 +00:00
const FunctionOverloadResolverPtr & function,
2020-11-17 14:51:05 +00:00
Inputs children,
std::string result_name,
bool can_replace)
2020-11-17 12:34:31 +00:00
{
2020-11-17 14:51:05 +00:00
size_t num_arguments = children.size();
2020-11-16 14:57:56 +00:00
Node node;
node.type = ActionType::FUNCTION;
node.function_builder = function;
2020-11-17 14:51:05 +00:00
node.children = std::move(children);
2020-11-16 14:57:56 +00:00
bool all_const = true;
ColumnsWithTypeAndName arguments(num_arguments);
for (size_t i = 0; i < num_arguments; ++i)
{
2020-11-17 14:51:05 +00:00
auto & child = *node.children[i];
2020-11-16 14:57:56 +00:00
node.allow_constant_folding = node.allow_constant_folding && child.allow_constant_folding;
ColumnWithTypeAndName argument;
argument.column = child.column;
argument.type = child.result_type;
argument.name = child.result_name;
if (!argument.column || !isColumnConst(*argument.column))
all_const = false;
arguments[i] = std::move(argument);
}
node.function_base = function->build(arguments);
node.result_type = node.function_base->getResultType();
node.function = node.function_base->prepare(arguments);
/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
/// But if we compile expressions compiled version of this function maybe placed in cache,
/// so we don't want to unfold non deterministic functions
if (all_const && node.function_base->isSuitableForConstantFolding()
&& (!settings.compile_expressions || node.function_base->isDeterministic()))
{
size_t num_rows = arguments.empty() ? 0 : arguments.front().column->size();
auto col = node.function->execute(arguments, node.result_type, num_rows, true);
/// If the result is not a constant, just in case, we will consider the result as unknown.
if (isColumnConst(*col))
{
/// All constant (literal) columns in block are added with size 1.
/// But if there was no columns in block before executing a function, the result has size 0.
/// Change the size to 1.
if (col->empty())
col = col->cloneResized(1);
node.column = std::move(col);
}
}
/// Some functions like ignore() or getTypeName() always return constant result even if arguments are not constant.
/// We can't do constant folding, but can specify in sample block that function result is constant to avoid
/// unnecessary materialization.
if (!node.column && node.function_base->isSuitableForConstantFolding())
{
if (auto col = node.function_base->getResultIfAlwaysReturnsConstantAndHasArguments(arguments))
{
node.column = std::move(col);
node.allow_constant_folding = false;
}
}
if (result_name.empty())
{
result_name = function->getName() + "(";
2020-11-17 14:51:05 +00:00
for (size_t i = 0; i < num_arguments; ++i)
2020-11-16 14:57:56 +00:00
{
if (i)
result_name += ", ";
2020-11-17 14:51:05 +00:00
result_name += node.children[i]->result_name;
2020-11-16 14:57:56 +00:00
}
result_name += ")";
}
node.result_name = std::move(result_name);
2020-11-17 14:51:05 +00:00
return addNode(std::move(node), can_replace);
2020-11-16 14:57:56 +00:00
}
2020-11-17 07:03:11 +00:00
2020-11-16 14:57:56 +00:00
NamesAndTypesList ActionsDAG::getRequiredColumns() const
{
NamesAndTypesList result;
2020-11-17 12:34:31 +00:00
for (const auto & input : inputs)
result.emplace_back(input->result_name, input->result_type);
2020-11-16 14:57:56 +00:00
return result;
}
ColumnsWithTypeAndName ActionsDAG::getResultColumns() const
{
ColumnsWithTypeAndName result;
result.reserve(index.size());
for (const auto & node : index)
result.emplace_back(node->column, node->result_type, node->result_name);
return result;
}
NamesAndTypesList ActionsDAG::getNamesAndTypesList() const
{
NamesAndTypesList result;
for (const auto & node : index)
result.emplace_back(node->result_name, node->result_type);
return result;
}
Names ActionsDAG::getNames() const
{
Names names;
names.reserve(index.size());
for (const auto & node : index)
names.emplace_back(node->result_name);
return names;
}
std::string ActionsDAG::dumpNames() const
{
WriteBufferFromOwnString out;
for (auto it = nodes.begin(); it != nodes.end(); ++it)
{
if (it != nodes.begin())
out << ", ";
out << it->result_name;
}
return out.str();
}
void ActionsDAG::removeUnusedActions(const Names & required_names)
{
std::unordered_set<Node *> nodes_set;
std::vector<Node *> required_nodes;
required_nodes.reserve(required_names.size());
for (const auto & name : required_names)
{
auto it = index.find(name);
if (it == index.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown column: {}, there are only columns {}", name, dumpNames());
if (nodes_set.insert(*it).second)
required_nodes.push_back(*it);
}
removeUnusedActions(required_nodes);
}
void ActionsDAG::removeUnusedActions(const std::vector<Node *> & required_nodes)
{
{
Index new_index;
for (auto * node : required_nodes)
new_index.insert(node);
index.swap(new_index);
}
removeUnusedActions();
}
2021-02-10 16:26:49 +00:00
void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
2020-11-16 14:57:56 +00:00
{
std::unordered_set<const Node *> visited_nodes;
std::stack<Node *> stack;
for (auto * node : index)
{
visited_nodes.insert(node);
stack.push(node);
}
2020-11-27 09:11:34 +00:00
/// We cannot remove arrayJoin because it changes the number of rows.
for (auto & node : nodes)
{
if (node.type == ActionType::ARRAY_JOIN && visited_nodes.count(&node) == 0)
{
visited_nodes.insert(&node);
stack.push(&node);
}
2021-02-10 16:26:49 +00:00
if (node.type == ActionType::INPUT && !allow_remove_inputs)
visited_nodes.insert(&node);
2020-11-27 09:11:34 +00:00
}
2020-11-16 14:57:56 +00:00
while (!stack.empty())
{
auto * node = stack.top();
stack.pop();
if (!node->children.empty() && node->column && isColumnConst(*node->column) && node->allow_constant_folding)
{
/// Constant folding.
node->type = ActionsDAG::ActionType::COLUMN;
node->children.clear();
}
for (auto * child : node->children)
{
if (visited_nodes.count(child) == 0)
{
stack.push(child);
visited_nodes.insert(child);
}
}
}
nodes.remove_if([&](const Node & node) { return visited_nodes.count(&node) == 0; });
2020-11-17 12:34:31 +00:00
auto it = std::remove_if(inputs.begin(), inputs.end(), [&](const Node * node) { return visited_nodes.count(node) == 0; });
inputs.erase(it, inputs.end());
2020-11-16 14:57:56 +00:00
}
void ActionsDAG::addAliases(const NamesWithAliases & aliases, std::vector<Node *> & result_nodes)
{
std::vector<Node *> required_nodes;
for (const auto & item : aliases)
{
auto & child = getNode(item.first);
required_nodes.push_back(&child);
}
result_nodes.reserve(aliases.size());
for (size_t i = 0; i < aliases.size(); ++i)
{
const auto & item = aliases[i];
auto * child = required_nodes[i];
if (!item.second.empty() && item.first != item.second)
{
Node node;
node.type = ActionType::ALIAS;
node.result_type = child->result_type;
node.result_name = std::move(item.second);
node.column = child->column;
node.allow_constant_folding = child->allow_constant_folding;
node.children.emplace_back(child);
auto & alias = addNode(std::move(node), true);
result_nodes.push_back(&alias);
}
else
result_nodes.push_back(child);
}
}
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
{
std::vector<Node *> result_nodes;
addAliases(aliases, result_nodes);
}
void ActionsDAG::project(const NamesWithAliases & projection)
{
std::vector<Node *> result_nodes;
addAliases(projection, result_nodes);
removeUnusedActions(result_nodes);
projectInput();
settings.projected_output = true;
}
bool ActionsDAG::tryRestoreColumn(const std::string & column_name)
{
if (index.contains(column_name))
return true;
for (auto it = nodes.rbegin(); it != nodes.rend(); ++it)
{
auto & node = *it;
if (node.result_name == column_name)
{
index.replace(&node);
return true;
}
}
return false;
}
bool ActionsDAG::removeUnusedResult(const std::string & column_name)
2021-01-19 10:03:25 +00:00
{
/// Find column in index and remove.
const Node * col;
{
auto it = index.begin();
for (; it != index.end(); ++it)
if ((*it)->result_name == column_name)
break;
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found result {} in ActionsDAG\n{}", column_name, dumpDAG());
col = *it;
index.remove(it);
}
/// Check if column is in input.
2021-01-19 10:03:25 +00:00
auto it = inputs.begin();
for (; it != inputs.end(); ++it)
if (*it == col)
2021-01-19 10:03:25 +00:00
break;
if (it == inputs.end())
return false;
2021-01-19 10:03:25 +00:00
/// Check column has no dependent.
2021-01-19 10:03:25 +00:00
for (const auto & node : nodes)
for (const auto * child : node.children)
if (col == child)
return false;
2021-01-19 10:03:25 +00:00
/// Remove from nodes and inputs.
2021-01-19 11:48:09 +00:00
for (auto jt = nodes.begin(); jt != nodes.end(); ++jt)
{
if (&(*jt) == *it)
2021-01-19 11:48:09 +00:00
{
nodes.erase(jt);
break;
}
}
2021-01-19 10:03:25 +00:00
inputs.erase(it);
return true;
2021-01-19 10:03:25 +00:00
}
2020-11-16 14:57:56 +00:00
ActionsDAGPtr ActionsDAG::clone() const
{
auto actions = cloneEmpty();
std::unordered_map<const Node *, Node *> copy_map;
for (const auto & node : nodes)
{
auto & copy_node = actions->nodes.emplace_back(node);
copy_map[&node] = &copy_node;
}
for (auto & node : actions->nodes)
for (auto & child : node.children)
child = copy_map[child];
for (const auto & node : index)
actions->index.insert(copy_map[node]);
2020-11-17 12:34:31 +00:00
for (const auto & node : inputs)
actions->inputs.push_back(copy_map[node]);
2020-11-16 14:57:56 +00:00
return actions;
}
void ActionsDAG::compileExpressions()
{
#if USE_EMBEDDED_COMPILER
if (settings.compile_expressions)
{
compileFunctions();
removeUnusedActions();
}
#endif
}
std::string ActionsDAG::dumpDAG() const
{
std::unordered_map<const Node *, size_t> map;
for (const auto & node : nodes)
{
size_t idx = map.size();
map[&node] = idx;
}
WriteBufferFromOwnString out;
for (const auto & node : nodes)
{
out << map[&node] << " : ";
switch (node.type)
{
case ActionsDAG::ActionType::COLUMN:
out << "COLUMN ";
break;
case ActionsDAG::ActionType::ALIAS:
out << "ALIAS ";
break;
case ActionsDAG::ActionType::FUNCTION:
out << "FUNCTION ";
break;
case ActionsDAG::ActionType::ARRAY_JOIN:
out << "ARRAY JOIN ";
break;
case ActionsDAG::ActionType::INPUT:
out << "INPUT ";
break;
}
out << "(";
for (size_t i = 0; i < node.children.size(); ++i)
{
if (i)
out << ", ";
out << map[node.children[i]];
}
out << ")";
out << " " << (node.column ? node.column->getName() : "(no column)");
out << " " << (node.result_type ? node.result_type->getName() : "(no type)");
out << " " << (!node.result_name.empty() ? node.result_name : "(no name)");
if (node.function_base)
out << " [" << node.function_base->getName() << "]";
out << "\n";
}
2021-01-12 18:47:54 +00:00
out << "Index:";
for (const auto * node : index)
out << ' ' << map[node];
out << '\n';
2020-11-16 14:57:56 +00:00
return out.str();
}
bool ActionsDAG::hasArrayJoin() const
{
for (const auto & node : nodes)
if (node.type == ActionType::ARRAY_JOIN)
return true;
return false;
}
bool ActionsDAG::hasStatefulFunctions() const
{
for (const auto & node : nodes)
if (node.type == ActionType::FUNCTION && node.function_base->isStateful())
return true;
return false;
}
2021-01-28 11:00:24 +00:00
bool ActionsDAG::trivial() const
2020-11-16 14:57:56 +00:00
{
for (const auto & node : nodes)
2021-01-28 11:00:24 +00:00
if (node.type == ActionType::FUNCTION || node.type == ActionType::ARRAY_JOIN)
2020-11-16 14:57:56 +00:00
return false;
return true;
}
2020-11-17 14:51:05 +00:00
ActionsDAGPtr ActionsDAG::makeConvertingActions(
const ColumnsWithTypeAndName & source,
const ColumnsWithTypeAndName & result,
MatchColumnsMode mode,
bool ignore_constant_values)
{
size_t num_input_columns = source.size();
size_t num_result_columns = result.size();
if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
auto actions_dag = std::make_shared<ActionsDAG>(source);
std::vector<Node *> projection(num_result_columns);
2020-11-17 19:43:26 +00:00
FunctionOverloadResolverPtr func_builder_materialize =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionMaterialize>()));
2020-11-17 14:51:05 +00:00
std::map<std::string_view, std::list<size_t>> inputs;
if (mode == MatchColumnsMode::Name)
{
for (size_t pos = 0; pos < actions_dag->inputs.size(); ++pos)
inputs[actions_dag->inputs[pos]->result_name].push_back(pos);
}
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result[result_col_num];
Node * src_node = nullptr;
switch (mode)
{
case MatchColumnsMode::Position:
{
src_node = actions_dag->inputs[result_col_num];
break;
}
case MatchColumnsMode::Name:
{
auto & input = inputs[res_elem.name];
if (input.empty())
2020-12-22 17:46:31 +00:00
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
2020-11-17 14:51:05 +00:00
ErrorCodes::THERE_IS_NO_COLUMN);
src_node = actions_dag->inputs[input.front()];
input.pop_front();
break;
}
}
/// Check constants.
if (const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
{
if (const auto * src_const = typeid_cast<const ColumnConst *>(src_node->column.get()))
{
if (ignore_constant_values)
src_node = const_cast<Node *>(&actions_dag->addColumn(res_elem, true));
else if (res_const->getField() != src_const->getField())
2020-12-15 17:39:58 +00:00
throw Exception("Cannot convert column " + backQuote(res_elem.name) + " because "
2020-11-17 14:51:05 +00:00
"it is constant but values of constants are different in source and result",
ErrorCodes::ILLEGAL_COLUMN);
}
else
2020-12-15 17:39:58 +00:00
throw Exception("Cannot convert column " + backQuote(res_elem.name) + " because "
2020-11-17 14:51:05 +00:00
"it is non constant in source stream but must be constant in result",
ErrorCodes::ILLEGAL_COLUMN);
}
2020-11-20 16:52:50 +00:00
/// Add CAST function to convert into result type if needed.
2020-11-17 18:36:13 +00:00
if (!res_elem.type->equals(*src_node->result_type))
2020-11-17 14:51:05 +00:00
{
ColumnWithTypeAndName column;
column.name = res_elem.type->getName();
column.column = DataTypeString().createColumnConst(0, column.name);
column.type = std::make_shared<DataTypeString>();
auto * right_arg = const_cast<Node *>(&actions_dag->addColumn(std::move(column), true));
auto * left_arg = src_node;
FunctionCast::Diagnostic diagnostic = {src_node->result_name, res_elem.name};
2020-11-18 09:35:32 +00:00
FunctionOverloadResolverPtr func_builder_cast =
std::make_shared<FunctionOverloadResolverAdaptor>(
CastOverloadResolver<CastType::nonAccurate>::createImpl(false, std::move(diagnostic)));
2020-11-18 09:35:32 +00:00
2020-11-17 14:51:05 +00:00
Inputs children = { left_arg, right_arg };
2020-11-17 18:36:13 +00:00
src_node = &actions_dag->addFunction(func_builder_cast, std::move(children), {}, true);
2020-11-17 14:51:05 +00:00
}
2020-11-17 21:31:30 +00:00
if (src_node->column && isColumnConst(*src_node->column) && !(res_elem.column && isColumnConst(*res_elem.column)))
2020-11-17 19:43:26 +00:00
{
Inputs children = {src_node};
src_node = &actions_dag->addFunction(func_builder_materialize, std::move(children), {}, true);
}
2020-11-17 14:51:05 +00:00
if (src_node->result_name != res_elem.name)
2020-11-18 09:08:51 +00:00
src_node = &actions_dag->addAlias(*src_node, res_elem.name, true);
2020-11-17 14:51:05 +00:00
projection[result_col_num] = src_node;
}
actions_dag->removeUnusedActions(projection);
actions_dag->projectInput();
return actions_dag;
}
ActionsDAGPtr ActionsDAG::makeAddingColumnActions(ColumnWithTypeAndName column)
{
auto adding_column_action = std::make_shared<ActionsDAG>();
FunctionOverloadResolverPtr func_builder_materialize =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionMaterialize>()));
auto column_name = column.name;
const auto & column_node = adding_column_action->addColumn(std::move(column));
Inputs inputs = {const_cast<Node *>(&column_node)};
auto & function_node = adding_column_action->addFunction(func_builder_materialize, std::move(inputs), {}, true);
adding_column_action->addAlias(function_node, std::move(column_name), true);
return adding_column_action;
}
2020-12-01 11:19:03 +00:00
ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
2020-11-26 16:16:44 +00:00
{
2020-12-01 11:19:03 +00:00
/// first: x (1), x (2), y ==> x (2), z, x (3)
/// second: x (1), x (2), x (3) ==> x (3), x (2), x (1)
2021-01-18 21:54:01 +00:00
/// merge: x (1), x (2), x (3), y =(first)=> x (2), z, x (4), x (3) =(second)=> x (3), x (4), x (2), z
2020-11-26 16:16:44 +00:00
2020-12-01 11:19:03 +00:00
/// Will store merged result in `first`.
2020-11-26 16:16:44 +00:00
2020-12-01 11:19:03 +00:00
/// This map contains nodes which should be removed from `first` index, cause they are used as inputs for `second`.
2021-01-12 18:58:05 +00:00
/// The second element is the number of removes (cause one node may be repeated several times in result).
std::unordered_map<Node *, size_t> removed_first_result;
2020-12-01 11:19:03 +00:00
/// Map inputs of `second` to nodes of `first`.
2020-11-26 16:16:44 +00:00
std::unordered_map<Node *, Node *> inputs_map;
/// Update inputs list.
{
2020-12-01 11:19:03 +00:00
/// Index may have multiple columns with same name. They also may be used by `second`. Order is important.
std::unordered_map<std::string_view, std::list<Node *>> first_result;
for (auto & node : first.index)
first_result[node->result_name].push_back(node);
2020-11-26 16:16:44 +00:00
2020-12-01 11:19:03 +00:00
for (auto & node : second.inputs)
2020-11-26 16:16:44 +00:00
{
2020-12-01 11:19:03 +00:00
auto it = first_result.find(node->result_name);
if (it == first_result.end() || it->second.empty())
2020-11-26 16:16:44 +00:00
{
2020-12-01 11:19:03 +00:00
if (first.settings.project_input)
2020-11-26 16:16:44 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find column {} in ActionsDAG result", node->result_name);
2020-12-01 11:19:03 +00:00
first.inputs.push_back(node);
2020-11-26 16:16:44 +00:00
}
else
{
inputs_map[node] = it->second.front();
2021-01-12 18:58:05 +00:00
removed_first_result[it->second.front()] += 1;
2020-11-26 16:16:44 +00:00
it->second.pop_front();
}
}
}
2020-12-01 11:19:03 +00:00
/// Replace inputs from `second` to nodes from `first` result.
for (auto & node : second.nodes)
2020-11-26 19:48:21 +00:00
{
for (auto & child : node.children)
{
if (child->type == ActionType::INPUT)
{
auto it = inputs_map.find(child);
if (it != inputs_map.end())
child = it->second;
}
}
}
2020-12-01 11:19:03 +00:00
for (auto & node : second.index)
2020-11-26 19:48:21 +00:00
{
if (node->type == ActionType::INPUT)
{
auto it = inputs_map.find(node);
if (it != inputs_map.end())
node = it->second;
}
}
2020-11-26 16:16:44 +00:00
/// Update index.
2020-12-01 11:19:03 +00:00
if (second.settings.project_input)
2020-11-26 16:16:44 +00:00
{
2020-12-01 11:19:03 +00:00
first.index.swap(second.index);
first.settings.project_input = true;
2020-11-26 16:16:44 +00:00
}
else
{
2020-12-01 11:19:03 +00:00
/// Remove `second` inputs from index.
for (auto it = first.index.begin(); it != first.index.end();)
2020-11-26 16:16:44 +00:00
{
auto cur = it;
++it;
2021-01-12 18:58:05 +00:00
auto jt = removed_first_result.find(*cur);
if (jt != removed_first_result.end() && jt->second > 0)
{
2020-12-01 11:19:03 +00:00
first.index.remove(cur);
2021-01-12 18:58:05 +00:00
--jt->second;
}
2020-11-26 16:16:44 +00:00
}
2021-01-18 21:54:01 +00:00
for (auto it = second.index.rbegin(); it != second.index.rend(); ++it)
first.index.prepend(*it);
2020-11-26 16:16:44 +00:00
}
2020-12-01 11:19:03 +00:00
first.nodes.splice(first.nodes.end(), std::move(second.nodes));
2020-11-26 16:16:44 +00:00
2021-01-27 11:10:09 +00:00
/// Here we rebuild index because some string_view from the first map now may point to string from second.
ActionsDAG::Index first_index;
for (auto * node : first.index)
first_index.insert(node);
first.index.swap(first_index);
2020-11-26 16:16:44 +00:00
#if USE_EMBEDDED_COMPILER
2020-12-01 11:19:03 +00:00
if (first.compilation_cache == nullptr)
first.compilation_cache = second.compilation_cache;
2020-11-26 16:16:44 +00:00
#endif
2020-12-01 11:19:03 +00:00
first.settings.max_temporary_columns = std::max(first.settings.max_temporary_columns, second.settings.max_temporary_columns);
first.settings.max_temporary_non_const_columns = std::max(first.settings.max_temporary_non_const_columns, second.settings.max_temporary_non_const_columns);
first.settings.min_count_to_compile_expression = std::max(first.settings.min_count_to_compile_expression, second.settings.min_count_to_compile_expression);
first.settings.projected_output = second.settings.projected_output;
2020-11-26 16:16:44 +00:00
/// Drop unused inputs and, probably, some actions.
2020-12-01 11:19:03 +00:00
first.removeUnusedActions();
2020-11-26 16:16:44 +00:00
2020-12-01 11:19:03 +00:00
return std::make_shared<ActionsDAG>(std::move(first));
2020-11-26 16:16:44 +00:00
}
ActionsDAG::SplitResult ActionsDAG::split(std::unordered_set<const Node *> split_nodes) const
2021-01-18 14:59:59 +00:00
{
/// Split DAG into two parts.
/// (first_nodes, first_index) is a part which will have split_list in result.
/// (second_nodes, second_index) is a part which will have same index as current actions.
std::list<Node> second_nodes;
std::list<Node> first_nodes;
Index second_index;
Index first_index;
/// List of nodes from current actions which are not inputs, but will be in second part.
std::vector<const Node *> new_inputs;
struct Frame
{
const Node * node;
size_t next_child_to_visit = 0;
};
struct Data
{
bool needed_by_split_node = false;
bool visited = false;
bool used_in_result = false;
/// Copies of node in one of the DAGs.
/// For COLUMN and INPUT both copies may exist.
Node * to_second = nullptr;
Node * to_first = nullptr;
};
std::stack<Frame> stack;
std::unordered_map<const Node *, Data> data;
for (const auto & node : index)
data[node].used_in_result = true;
/// DFS. Decide if node is needed by split.
for (const auto & node : nodes)
{
if (split_nodes.count(&node) == 0)
continue;
auto & cur_data = data[&node];
if (cur_data.needed_by_split_node)
continue;
cur_data.needed_by_split_node = true;
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur_node = stack.top().node;
stack.pop();
for (const auto * child : cur_node->children)
{
auto & child_data = data[child];
if (!child_data.needed_by_split_node)
{
child_data.needed_by_split_node = true;
stack.push({.node = child});
}
}
}
}
/// DFS. Move nodes to one of the DAGs.
for (const auto & node : nodes)
{
if (!data[&node].visited)
stack.push({.node = &node});
while (!stack.empty())
{
auto & cur = stack.top();
auto & cur_data = data[cur.node];
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
auto & child_data = data[child];
if (!child_data.visited)
{
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
/// Make a copy part.
if (cur.next_child_to_visit == cur.node->children.size())
{
cur_data.visited = true;
stack.pop();
if (!cur_data.needed_by_split_node)
{
auto & copy = second_nodes.emplace_back(*cur.node);
cur_data.to_second = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
auto & child_data = data[child];
2021-01-22 13:46:56 +00:00
/// If children is not created, it may be from split part.
2021-01-18 14:59:59 +00:00
if (!child_data.to_second)
{
if (child->type == ActionType::COLUMN) /// Just create new node for COLUMN action.
{
child_data.to_second = &second_nodes.emplace_back(*child);
}
else
{
/// Node from first part is added as new input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = child->result_type;
input_node.result_name = child->result_name;
child_data.to_second = &second_nodes.emplace_back(std::move(input_node));
new_inputs.push_back(child);
}
}
child = child_data.to_second;
}
2021-01-18 20:52:33 +00:00
/// Input from second DAG should also be in the first.
if (copy.type == ActionType::INPUT)
{
auto & input_copy = first_nodes.emplace_back(*cur.node);
assert(cur_data.to_first == nullptr);
cur_data.to_first = &input_copy;
new_inputs.push_back(cur.node);
}
2021-01-18 14:59:59 +00:00
}
else
{
auto & copy = first_nodes.emplace_back(*cur.node);
cur_data.to_first = &copy;
/// Replace children to newly created nodes.
for (auto & child : copy.children)
{
child = data[child].to_first;
assert(child != nullptr);
}
2021-01-18 20:34:46 +00:00
if (cur_data.used_in_result)
2021-01-18 14:59:59 +00:00
{
/// If this node is needed in result, add it as input.
Node input_node;
input_node.type = ActionType::INPUT;
input_node.result_type = node.result_type;
input_node.result_name = node.result_name;
cur_data.to_second = &second_nodes.emplace_back(std::move(input_node));
2021-01-18 20:34:46 +00:00
new_inputs.push_back(cur.node);
2021-01-18 14:59:59 +00:00
}
}
}
}
}
for (auto * node : index)
second_index.insert(data[node].to_second);
Inputs second_inputs;
Inputs first_inputs;
for (auto * input : inputs)
{
const auto & cur = data[input];
first_inputs.push_back(cur.to_first);
}
for (const auto * input : new_inputs)
{
const auto & cur = data[input];
second_inputs.push_back(cur.to_second);
first_index.insert(cur.to_first);
}
auto first_actions = cloneEmpty();
first_actions->nodes.swap(first_nodes);
first_actions->index.swap(first_index);
first_actions->inputs.swap(first_inputs);
auto second_actions = cloneEmpty();
second_actions->nodes.swap(second_nodes);
second_actions->index.swap(second_index);
second_actions->inputs.swap(second_inputs);
return {std::move(first_actions), std::move(second_actions)};
}
ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) const
2020-11-16 14:57:56 +00:00
{
struct Frame
{
2021-01-18 16:12:16 +00:00
const Node * node;
2020-11-16 14:57:56 +00:00
size_t next_child_to_visit = 0;
};
2021-01-18 16:12:16 +00:00
std::unordered_set<const Node *> split_nodes;
std::unordered_set<const Node *> visited_nodes;
2020-11-16 14:57:56 +00:00
std::stack<Frame> stack;
2021-01-18 16:12:16 +00:00
/// DFS. Decide if node depends on ARRAY JOIN.
for (const auto & node : nodes)
2020-11-16 14:57:56 +00:00
{
2021-01-18 16:12:16 +00:00
if (visited_nodes.count(&node))
continue;
visited_nodes.insert(&node);
stack.push({.node = &node});
2020-11-16 14:57:56 +00:00
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children. We depend on ARRAY JOIN if any child does.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
2021-01-18 16:12:16 +00:00
if (visited_nodes.count(child) == 0)
2020-11-16 14:57:56 +00:00
{
2021-01-18 16:12:16 +00:00
visited_nodes.insert(child);
2020-11-16 14:57:56 +00:00
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
2021-01-18 16:12:16 +00:00
bool depend_on_array_join = false;
2020-11-16 14:57:56 +00:00
if (cur.node->type == ActionType::INPUT && array_joined_columns.count(cur.node->result_name))
2021-01-18 16:12:16 +00:00
depend_on_array_join = true;
2020-11-16 14:57:56 +00:00
2021-01-18 16:12:16 +00:00
for (const auto * child : cur.node->children)
2020-11-16 14:57:56 +00:00
{
2021-01-18 16:12:16 +00:00
if (split_nodes.count(child) == 0)
depend_on_array_join = true;
2020-11-16 14:57:56 +00:00
}
2021-01-18 16:12:16 +00:00
if (!depend_on_array_join)
split_nodes.insert(cur.node);
2020-11-16 14:57:56 +00:00
2021-01-18 16:12:16 +00:00
stack.pop();
2020-11-16 14:57:56 +00:00
}
}
}
2021-01-18 19:56:34 +00:00
auto res = split(split_nodes);
/// Do not remove array joined columns if they are not used.
res.first->settings.project_input = false;
return res;
2020-11-16 14:57:56 +00:00
}
ActionsDAG::SplitResult ActionsDAG::splitActionsForFilter(const std::string & column_name) const
2021-01-19 10:03:25 +00:00
{
2021-01-19 14:53:51 +00:00
auto it = index.begin();
for (; it != index.end(); ++it)
if ((*it)->result_name == column_name)
break;
2021-01-19 10:03:25 +00:00
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
column_name, dumpDAG());
std::unordered_set<const Node *> split_nodes = {*it};
return split(split_nodes);
}
2021-02-10 17:47:48 +00:00
ActionsDAGPtr ActionsDAG::splitActionsForFilter(const std::string & filter_name, bool can_remove_filter, const Names & available_inputs)
2021-02-10 16:26:49 +00:00
{
std::unordered_map<std::string_view, std::list<const Node *>> inputs_map;
for (const auto & input : inputs)
inputs_map[input->result_name].emplace_back(input);
std::unordered_set<const Node *> allowed_nodes;
for (const auto & name : available_inputs)
{
auto & inputs_list = inputs_map[name];
if (inputs_list.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find input {} in ActionsDAG. DAG:\n{}", name, dumpDAG());
allowed_nodes.emplace(inputs_list.front());
inputs_list.pop_front();
}
auto it = index.begin();
for (; it != index.end(); ++it)
if ((*it)->result_name == filter_name)
break;
if (it == index.end())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Index for ActionsDAG does not contain filter column name {}. DAG:\n{}",
filter_name, dumpDAG());
std::unordered_set<Node *> selected_predicates;
2021-02-10 17:47:48 +00:00
std::unordered_set<Node *> other_predicates;
2021-02-10 16:26:49 +00:00
{
struct Frame
{
const Node * node;
bool is_predicate = false;
size_t next_child_to_visit = 0;
size_t num_allowed_children = 0;
};
std::stack<Frame> stack;
std::unordered_set<const Node *> visited_nodes;
stack.push(Frame{.node = *it, .is_predicate = true});
visited_nodes.insert(*it);
while (!stack.empty())
{
auto & cur = stack.top();
bool is_conjunction = cur.is_predicate
&& cur.node->type == ActionType::FUNCTION
&& cur.node->function_base->getName() == "and";
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
if (visited_nodes.count(child) == 0)
{
visited_nodes.insert(child);
stack.push({.node = child, .is_predicate = is_conjunction});
break;
}
if (allowed_nodes.contains(child))
++cur.num_allowed_children;
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
if (cur.num_allowed_children == cur.node->children.size())
{
if (cur.node->type != ActionType::ARRAY_JOIN && cur.node->type != ActionType::INPUT)
allowed_nodes.emplace(cur.node);
}
else if (is_conjunction)
{
for (auto * child : cur.node->children)
2021-02-10 17:47:48 +00:00
{
2021-02-10 16:26:49 +00:00
if (allowed_nodes.count(child))
selected_predicates.insert(child);
2021-02-10 17:47:48 +00:00
else
other_predicates.insert(child);
}
2021-02-10 16:26:49 +00:00
}
stack.pop();
}
}
}
if (selected_predicates.empty())
{
if (allowed_nodes.count(*it))
selected_predicates.insert(*it);
else
return nullptr;
}
auto actions = cloneEmpty();
actions->settings.project_input = false;
2021-02-10 17:47:48 +00:00
FunctionOverloadResolverPtr func_builder_and =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionAnd>()));
2021-02-10 16:26:49 +00:00
std::unordered_map<const Node *, Node *> nodes_mapping;
{
struct Frame
{
const Node * node;
size_t next_child_to_visit = 0;
};
std::stack<Frame> stack;
for (const auto * predicate : selected_predicates)
{
if (nodes_mapping.count(predicate))
continue;
stack.push({.node = predicate});
while (!stack.empty())
{
auto & cur = stack.top();
/// At first, visit all children.
while (cur.next_child_to_visit < cur.node->children.size())
{
auto * child = cur.node->children[cur.next_child_to_visit];
if (nodes_mapping.count(child) == 0)
{
stack.push({.node = child});
break;
}
++cur.next_child_to_visit;
}
if (cur.next_child_to_visit == cur.node->children.size())
{
auto & node = actions->nodes.emplace_back(*cur.node);
nodes_mapping[cur.node] = &node;
for (auto & child : node.children)
child = nodes_mapping[child];
if (node.type == ActionType::INPUT)
{
actions->inputs.emplace_back(&node);
actions->index.insert(&node);
}
2021-02-11 12:06:28 +00:00
stack.pop();
2021-02-10 16:26:49 +00:00
}
}
}
Node * result_predicate = nodes_mapping[*selected_predicates.begin()];
if (selected_predicates.size() > 1)
{
std::vector<Node *> args;
args.reserve(selected_predicates.size());
for (const auto * predicate : selected_predicates)
args.emplace_back(nodes_mapping[predicate]);
result_predicate = &actions->addFunction(func_builder_and, args, {}, true);
}
actions->index.insert(result_predicate);
}
2021-02-10 17:47:48 +00:00
if (selected_predicates.count(*it))
2021-02-10 16:26:49 +00:00
{
2021-02-10 17:47:48 +00:00
/// The whole predicate was split.
if (can_remove_filter)
{
for (auto i = index.begin(); i != index.end(); ++i)
{
if (*i == *it)
{
index.remove(i);
break;
}
}
}
else
{
Node node;
node.type = ActionType::COLUMN;
node.result_name = std::move((*it)->result_name);
node.result_type = std::move((*it)->result_type);
node.column = node.result_type->createColumnConst(0, 1);
*(*it) = std::move(node);
}
removeUnusedActions(false);
2021-02-10 16:26:49 +00:00
}
2021-02-10 17:47:48 +00:00
else if ((*it)->type == ActionType::FUNCTION && (*it)->function_base->getName() == "and")
{
std::vector<Node *> new_children(other_predicates.begin(), other_predicates.end());
2021-02-10 16:26:49 +00:00
2021-02-10 17:47:48 +00:00
if (new_children.size() == 1)
{
if (new_children.front()->result_type->equals(*((*it)->result_type)))
{
Node node;
node.type = ActionType::ALIAS;
node.result_name = (*it)->result_name;
node.result_type = (*it)->result_type;
node.children.swap(new_children);
*(*it) = std::move(node);
}
else
{
2021-02-11 15:44:10 +00:00
Node node;
node.type = ActionType::COLUMN;
node.result_name = (*it)->result_type->getName();
node.column = DataTypeString().createColumnConst(0, node.result_name);
node.result_type = std::make_shared<DataTypeString>();
auto * right_arg = &nodes.emplace_back(std::move(node));
auto * left_arg = new_children.front();
(*it)->children = {left_arg, right_arg};
2021-02-10 17:47:48 +00:00
ColumnsWithTypeAndName arguments;
arguments.reserve((*it)->children.size());
for (const auto * child : (*it)->children)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
arguments.emplace_back(std::move(argument));
}
FunctionOverloadResolverPtr func_builder_cast =
std::make_shared<FunctionOverloadResolverAdaptor>(
CastOverloadResolver<CastType::nonAccurate>::createImpl(false));
(*it)->function_builder = func_builder_cast;
(*it)->function_base = (*it)->function_builder->build(arguments);
(*it)->function = (*it)->function_base->prepare(arguments);
}
}
else
{
(*it)->children.swap(new_children);
ColumnsWithTypeAndName arguments;
arguments.reserve((*it)->children.size());
for (const auto * child : (*it)->children)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
arguments.emplace_back(std::move(argument));
}
(*it)->function_base = (*it)->function_builder->build(arguments);
(*it)->function = (*it)->function_base->prepare(arguments);
}
removeUnusedActions(false);
}
2021-02-10 16:26:49 +00:00
return actions;
}
2020-11-16 14:57:56 +00:00
}