ClickHouse/src/Interpreters/ExpressionActions.cpp

774 lines
26 KiB
C++
Raw Normal View History

2019-10-11 17:27:54 +00:00
#include <Interpreters/Set.h>
#include <Common/ProfileEvents.h>
2020-08-13 20:17:18 +00:00
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
2020-08-13 20:17:18 +00:00
#include <Columns/ColumnArray.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunction.h>
2020-11-10 18:22:26 +00:00
#include <IO/WriteBufferFromString.h>
2020-09-10 16:01:41 +00:00
#include <IO/Operators.h>
#include <optional>
2019-10-11 17:27:54 +00:00
#include <Columns/ColumnSet.h>
2020-09-13 13:51:31 +00:00
#include <queue>
2020-11-03 11:28:28 +00:00
#include <stack>
#if defined(MEMORY_SANITIZER)
#include <sanitizer/msan_interface.h>
#endif
#if defined(ADDRESS_SANITIZER)
#include <sanitizer/asan_interface.h>
#endif
namespace ProfileEvents
{
extern const Event FunctionExecute;
extern const Event CompiledFunctionExecute;
}
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
2018-03-09 23:23:15 +00:00
extern const int TOO_MANY_TEMPORARY_COLUMNS;
extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS;
2020-08-13 20:17:18 +00:00
extern const int TYPE_MISMATCH;
}
2020-11-10 16:27:55 +00:00
ExpressionActions::~ExpressionActions() = default;
ExpressionActions::ExpressionActions(ActionsDAGPtr actions_dag_)
{
actions_dag = actions_dag_->clone();
actions_dag->compileExpressions();
linearizeActions();
const auto & settings = actions_dag->getSettings();
if (settings.max_temporary_columns && num_columns > settings.max_temporary_columns)
throw Exception(ErrorCodes::TOO_MANY_TEMPORARY_COLUMNS,
"Too many temporary columns: {}. Maximum: {}",
actions_dag->dumpNames(), std::to_string(settings.max_temporary_columns));
}
ExpressionActionsPtr ExpressionActions::clone() const
{
2020-11-10 17:05:56 +00:00
return std::make_shared<ExpressionActions>(*this);
2020-11-10 16:27:55 +00:00
}
void ExpressionActions::linearizeActions()
{
2021-01-22 09:13:22 +00:00
/// This function does the topological sort on DAG and fills all the fields of ExpressionActions.
2020-11-11 14:56:56 +00:00
/// Algorithm traverses DAG starting from nodes without children.
/// For every node we support the number of created children, and if all children are created, put node into queue.
2020-11-10 16:27:55 +00:00
struct Data
{
const Node * node = nullptr;
size_t num_created_children = 0;
std::vector<const Node *> parents;
ssize_t position = -1;
size_t num_created_parents = 0;
bool used_in_result = false;
};
const auto & nodes = getNodes();
const auto & index = actions_dag->getIndex();
2020-11-17 12:34:31 +00:00
const auto & inputs = actions_dag->getInputs();
2020-11-10 16:27:55 +00:00
std::vector<Data> data(nodes.size());
std::unordered_map<const Node *, size_t> reverse_index;
for (const auto & node : nodes)
{
size_t id = reverse_index.size();
data[id].node = &node;
reverse_index[&node] = id;
}
2020-11-11 14:56:56 +00:00
/// There are independent queues for arrayJoin and other actions.
/// We delay creation of arrayJoin as long as we can, so that they will be executed closer to end.
2020-11-10 16:27:55 +00:00
std::queue<const Node *> ready_nodes;
std::queue<const Node *> ready_array_joins;
for (const auto * node : index)
data[reverse_index[node]].used_in_result = true;
for (const auto & node : nodes)
{
for (const auto & child : node.children)
data[reverse_index[child]].parents.emplace_back(&node);
}
for (const auto & node : nodes)
{
if (node.children.empty())
ready_nodes.emplace(&node);
}
2020-11-11 14:56:56 +00:00
/// Every argument will have fixed position in columns list.
/// If argument is removed, it's position may be reused by other action.
2020-11-10 16:27:55 +00:00
std::stack<size_t> free_positions;
while (!ready_nodes.empty() || !ready_array_joins.empty())
{
auto & stack = ready_nodes.empty() ? ready_array_joins : ready_nodes;
const Node * node = stack.front();
stack.pop();
auto & cur = data[reverse_index[node]];
2020-11-11 14:56:56 +00:00
/// Select position for action result.
2020-11-10 16:27:55 +00:00
size_t free_position = num_columns;
if (free_positions.empty())
++num_columns;
else
{
free_position = free_positions.top();
free_positions.pop();
}
cur.position = free_position;
ExpressionActions::Arguments arguments;
arguments.reserve(cur.node->children.size());
for (auto * child : cur.node->children)
{
auto & arg = data[reverse_index[child]];
if (arg.position < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Argument was not calculated for {}", child->result_name);
++arg.num_created_parents;
ExpressionActions::Argument argument;
argument.pos = arg.position;
argument.needed_later = arg.used_in_result || arg.num_created_parents != arg.parents.size();
if (!argument.needed_later)
free_positions.push(argument.pos);
arguments.emplace_back(argument);
}
if (node->type == ActionsDAG::ActionType::INPUT)
{
/// Argument for input is special. It contains the position from required columns.
ExpressionActions::Argument argument;
argument.needed_later = !cur.parents.empty();
arguments.emplace_back(argument);
2020-11-17 12:34:31 +00:00
//required_columns.push_back({node->result_name, node->result_type});
2020-11-10 16:27:55 +00:00
}
actions.push_back({node, arguments, free_position});
for (const auto & parent : cur.parents)
{
auto & parent_data = data[reverse_index[parent]];
++parent_data.num_created_children;
if (parent_data.num_created_children == parent->children.size())
{
auto & push_stack = parent->type == ActionsDAG::ActionType::ARRAY_JOIN ? ready_array_joins : ready_nodes;
push_stack.push(parent);
}
}
}
result_positions.reserve(index.size());
for (const auto & node : index)
{
auto pos = data[reverse_index[node]].position;
if (pos < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Action for {} was not calculated", node->result_name);
result_positions.push_back(pos);
ColumnWithTypeAndName col{node->column, node->result_type, node->result_name};
sample_block.insert(std::move(col));
}
2020-11-17 12:34:31 +00:00
for (const auto * input : inputs)
{
const auto & cur = data[reverse_index[input]];
auto pos = required_columns.size();
actions[cur.position].arguments.front().pos = pos;
required_columns.push_back({input->result_name, input->result_type});
input_positions[input->result_name].emplace_back(pos);
}
2020-11-10 16:27:55 +00:00
}
static WriteBuffer & operator << (WriteBuffer & out, const ExpressionActions::Argument & argument)
{
2020-11-09 15:01:08 +00:00
return out << (argument.needed_later ? ": " : ":: ") << argument.pos;
}
2020-11-03 11:28:28 +00:00
std::string ExpressionActions::Action::toString() const
{
WriteBufferFromOwnString out;
2020-11-03 11:28:28 +00:00
switch (node->type)
{
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::COLUMN:
2020-11-03 11:28:28 +00:00
out << "COLUMN "
<< (node->column ? node->column->getName() : "(no column)");
break;
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::ALIAS:
2020-11-03 11:28:28 +00:00
out << "ALIAS " << node->children.front()->result_name << " " << arguments.front();
break;
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::FUNCTION:
2020-11-03 11:28:28 +00:00
out << "FUNCTION " << (node->is_function_compiled ? "[compiled] " : "")
<< (node->function_base ? node->function_base->getName() : "(no function)") << "(";
for (size_t i = 0; i < node->children.size(); ++i)
{
if (i)
2020-11-03 11:28:28 +00:00
out << ", ";
out << node->children[i]->result_name << " " << arguments[i];
}
2020-11-03 11:28:28 +00:00
out << ")";
break;
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::ARRAY_JOIN:
2020-11-03 11:28:28 +00:00
out << "ARRAY JOIN " << node->children.front()->result_name << " " << arguments.front();
break;
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::INPUT:
2020-11-03 11:28:28 +00:00
out << "INPUT " << arguments.front();
break;
}
2020-11-03 11:28:28 +00:00
out << " -> " << node->result_name
2020-11-09 15:01:08 +00:00
<< " " << (node->result_type ? node->result_type->getName() : "(no type)") << " : " << result_position;
2020-11-03 11:28:28 +00:00
return out.str();
}
2020-11-10 20:36:38 +00:00
void ExpressionActions::checkLimits(const ColumnsWithTypeAndName & columns) const
{
2020-11-11 11:15:25 +00:00
auto max_temporary_non_const_columns = actions_dag->getSettings().max_temporary_non_const_columns;
2020-11-03 11:28:28 +00:00
if (max_temporary_non_const_columns)
{
size_t non_const_columns = 0;
2020-11-10 20:36:38 +00:00
for (const auto & column : columns)
2020-11-03 11:28:28 +00:00
if (column.column && !isColumnConst(*column.column))
++non_const_columns;
2020-11-03 11:28:28 +00:00
if (non_const_columns > max_temporary_non_const_columns)
{
WriteBufferFromOwnString list_of_non_const_columns;
2020-11-10 20:36:38 +00:00
for (const auto & column : columns)
2020-11-03 11:28:28 +00:00
if (column.column && !isColumnConst(*column.column))
list_of_non_const_columns << "\n" << column.name;
throw Exception("Too many temporary non-const columns:" + list_of_non_const_columns.str()
2020-11-03 11:28:28 +00:00
+ ". Maximum: " + std::to_string(max_temporary_non_const_columns),
2018-03-09 23:23:15 +00:00
ErrorCodes::TOO_MANY_TEMPORARY_NON_CONST_COLUMNS);
}
}
}
2020-11-10 20:36:38 +00:00
namespace
{
2020-11-11 14:26:18 +00:00
/// This struct stores context needed to execute actions.
///
/// Execution model is following:
/// * execution is performed over list of columns (with fixed size = ExpressionActions::num_columns)
/// * every argument has fixed position in columns list, every action has fixed position for result
/// * if argument is not needed anymore (Argument::needed_later == false), it is removed from list
/// * argument for INPUT is in inputs[inputs_pos[argument.pos]]
///
/// Columns on positions `ExpressionActions::result_positions` are inserted back into block.
2020-11-10 20:36:38 +00:00
struct ExecutionContext
2020-10-13 08:16:47 +00:00
{
2020-11-10 20:36:38 +00:00
ColumnsWithTypeAndName & inputs;
ColumnsWithTypeAndName columns = {};
std::vector<ssize_t> inputs_pos = {};
size_t num_rows;
2020-10-13 08:16:47 +00:00
};
2013-05-28 12:05:47 +00:00
}
2020-11-10 20:36:38 +00:00
static void executeAction(const ExpressionActions::Action & action, ExecutionContext & execution_context, bool dry_run)
2020-10-07 18:37:27 +00:00
{
2020-11-03 11:28:28 +00:00
auto & inputs = execution_context.inputs;
2020-10-13 08:16:47 +00:00
auto & columns = execution_context.columns;
auto & num_rows = execution_context.num_rows;
2020-10-07 18:37:27 +00:00
switch (action.node->type)
{
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::FUNCTION:
2020-10-07 18:37:27 +00:00
{
2020-10-13 08:16:47 +00:00
auto & res_column = columns[action.result_position];
if (res_column.type || res_column.column)
throw Exception("Result column is not empty", ErrorCodes::LOGICAL_ERROR);
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
res_column.type = action.node->result_type;
2020-11-03 11:28:28 +00:00
res_column.name = action.node->result_name;
ColumnsWithTypeAndName arguments(action.arguments.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
2020-11-09 15:01:08 +00:00
if (!action.arguments[i].needed_later)
2020-11-03 11:28:28 +00:00
arguments[i] = std::move(columns[action.arguments[i].pos]);
else
arguments[i] = columns[action.arguments[i].pos];
}
2020-10-07 18:37:27 +00:00
ProfileEvents::increment(ProfileEvents::FunctionExecute);
2020-10-13 08:16:47 +00:00
if (action.node->is_function_compiled)
2020-10-07 18:37:27 +00:00
ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
2020-11-03 11:28:28 +00:00
res_column.column = action.node->function->execute(arguments, res_column.type, num_rows, dry_run);
2020-10-07 18:37:27 +00:00
break;
}
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::ARRAY_JOIN:
2020-10-07 18:37:27 +00:00
{
2020-11-03 11:28:28 +00:00
size_t array_join_key_pos = action.arguments.front().pos;
2020-10-13 08:16:47 +00:00
auto array_join_key = columns[array_join_key_pos];
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
/// Remove array join argument in advance if it is not needed.
2020-11-09 15:01:08 +00:00
if (!action.arguments.front().needed_later)
2020-10-13 08:16:47 +00:00
columns[array_join_key_pos] = {};
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
array_join_key.column = array_join_key.column->convertToFullColumnIfConst();
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
const ColumnArray * array = typeid_cast<const ColumnArray *>(array_join_key.column.get());
if (!array)
throw Exception("ARRAY JOIN of not array: " + action.node->result_name, ErrorCodes::TYPE_MISMATCH);
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
for (auto & column : columns)
if (column.column)
column.column = column.column->replicate(array->getOffsets());
2020-10-07 18:37:27 +00:00
2020-11-03 11:28:28 +00:00
for (auto & column : inputs)
2020-10-13 08:16:47 +00:00
if (column.column)
column.column = column.column->replicate(array->getOffsets());
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
auto & res_column = columns[action.result_position];
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
res_column.column = array->getDataPtr();
res_column.type = assert_cast<const DataTypeArray &>(*array_join_key.type).getNestedType();
2020-11-03 11:28:28 +00:00
res_column.name = action.node->result_name;
2020-10-07 18:37:27 +00:00
2020-10-13 08:16:47 +00:00
num_rows = res_column.column->size();
2020-10-07 18:37:27 +00:00
break;
}
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::COLUMN:
2020-10-07 18:37:27 +00:00
{
2020-10-13 08:16:47 +00:00
auto & res_column = columns[action.result_position];
res_column.column = action.node->column->cloneResized(num_rows);
res_column.type = action.node->result_type;
2020-11-03 11:28:28 +00:00
res_column.name = action.node->result_name;
2020-10-07 18:37:27 +00:00
break;
}
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::ALIAS:
2020-10-13 08:16:47 +00:00
{
2020-11-03 11:28:28 +00:00
const auto & arg = action.arguments.front();
if (action.result_position != arg.pos)
{
columns[action.result_position].column = columns[arg.pos].column;
columns[action.result_position].type = columns[arg.pos].type;
2020-11-09 15:01:08 +00:00
if (!arg.needed_later)
2020-11-03 11:28:28 +00:00
columns[arg.pos] = {};
}
columns[action.result_position].name = action.node->result_name;
2020-10-07 18:37:27 +00:00
break;
2020-10-13 08:16:47 +00:00
}
2020-10-07 18:37:27 +00:00
2020-11-10 14:54:59 +00:00
case ActionsDAG::ActionType::INPUT:
2020-10-13 08:16:47 +00:00
{
2020-11-03 11:28:28 +00:00
auto pos = execution_context.inputs_pos[action.arguments.front().pos];
if (pos < 0)
{
2020-11-11 14:56:56 +00:00
/// Here we allow to skip input if it is not in block (in case it is not needed).
/// It may be unusual, but some code depend on such behaviour.
2020-11-09 15:01:08 +00:00
if (action.arguments.front().needed_later)
2020-11-03 11:28:28 +00:00
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Not found column {} in block",
action.node->result_name);
}
else
columns[action.result_position] = std::move(inputs[pos]);
break;
2020-10-13 08:16:47 +00:00
}
2020-10-07 18:37:27 +00:00
}
2020-11-03 11:28:28 +00:00
}
2020-10-13 08:16:47 +00:00
2020-11-10 20:36:38 +00:00
void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run) const
{
ExecutionContext execution_context
{
.inputs = block.data,
.num_rows = num_rows,
};
2020-11-17 12:34:31 +00:00
execution_context.inputs_pos.assign(required_columns.size(), -1);
for (size_t pos = 0; pos < block.columns(); ++pos)
{
const auto & col = block.getByPosition(pos);
auto it = input_positions.find(col.name);
if (it != input_positions.end())
{
for (auto input_pos : it->second)
{
if (execution_context.inputs_pos[input_pos] < 0)
{
execution_context.inputs_pos[input_pos] = pos;
break;
}
}
}
}
2020-11-10 20:36:38 +00:00
execution_context.columns.resize(num_columns);
for (const auto & action : actions)
{
try
{
executeAction(action, execution_context, dry_run);
checkLimits(execution_context.columns);
//std::cerr << "Action: " << action.toString() << std::endl;
//for (const auto & col : execution_context.columns)
// std::cerr << col.dumpStructure() << std::endl;
}
catch (Exception & e)
{
e.addMessage(fmt::format("while executing '{}'", action.toString()));
throw;
}
}
2020-11-11 11:15:25 +00:00
if (actions_dag->getSettings().project_input)
2020-11-10 20:36:38 +00:00
{
block.clear();
}
else
{
std::sort(execution_context.inputs_pos.rbegin(), execution_context.inputs_pos.rend());
for (auto input : execution_context.inputs_pos)
if (input >= 0)
block.erase(input);
}
2021-01-18 21:54:01 +00:00
Block res;
2020-11-10 20:36:38 +00:00
for (auto pos : result_positions)
if (execution_context.columns[pos].column)
2021-01-18 21:54:01 +00:00
res.insert(execution_context.columns[pos]);
for (const auto & item : block)
res.insert(std::move(item));
block.swap(res);
2020-11-10 20:36:38 +00:00
num_rows = execution_context.num_rows;
}
void ExpressionActions::execute(Block & block, bool dry_run) const
{
size_t num_rows = block.rows();
execute(block, num_rows, dry_run);
if (!block)
block.insert({DataTypeUInt8().createColumnConst(num_rows, 0), std::make_shared<DataTypeUInt8>(), "_dummy"});
}
2020-11-03 11:28:28 +00:00
Names ExpressionActions::getRequiredColumns() const
{
Names names;
for (const auto & input : required_columns)
names.push_back(input.name);
return names;
2020-10-07 18:37:27 +00:00
}
2020-09-08 10:40:53 +00:00
bool ExpressionActions::hasArrayJoin() const
2020-06-18 13:00:16 +00:00
{
for (const auto & action : actions)
2020-11-10 14:54:59 +00:00
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
2020-06-18 13:00:16 +00:00
return true;
return false;
}
std::string ExpressionActions::getSmallestColumn(const NamesAndTypesList & columns)
{
std::optional<size_t> min_size;
String res;
for (const auto & column : columns)
{
/// @todo resolve evil constant
size_t size = column.type->haveMaximumSizeOfValue() ? column.type->getMaximumSizeOfValueInMemory() : 100;
if (!min_size || size < *min_size)
{
min_size = size;
res = column.name;
}
}
if (!min_size)
throw Exception("No available columns", ErrorCodes::LOGICAL_ERROR);
return res;
}
std::string ExpressionActions::dumpActions() const
{
2020-11-10 18:22:26 +00:00
WriteBufferFromOwnString ss;
ss << "input:\n";
2020-11-03 11:28:28 +00:00
for (const auto & input_column : required_columns)
2020-03-08 23:48:08 +00:00
ss << input_column.name << " " << input_column.type->getName() << "\n";
ss << "\nactions:\n";
2020-03-08 23:48:08 +00:00
for (const auto & action : actions)
ss << action.toString() << '\n';
ss << "\noutput:\n";
NamesAndTypesList output_columns = sample_block.getNamesAndTypesList();
2020-03-08 23:48:08 +00:00
for (const auto & output_column : output_columns)
ss << output_column.name << " " << output_column.type->getName() << "\n";
2020-11-11 11:15:25 +00:00
ss << "\nproject input: " << actions_dag->getSettings().project_input << "\noutput positions:";
2020-11-03 11:28:28 +00:00
for (auto pos : result_positions)
ss << " " << pos;
ss << "\n";
return ss.str();
}
2019-10-11 17:27:54 +00:00
bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) const
{
/// Check has column in (empty set).
String set_to_check;
2019-11-01 10:58:29 +00:00
for (auto it = actions.rbegin(); it != actions.rend(); ++it)
2019-10-11 17:27:54 +00:00
{
2020-04-22 06:01:33 +00:00
const auto & action = *it;
2020-11-10 14:54:59 +00:00
if (action.node->type == ActionsDAG::ActionType::FUNCTION && action.node->function_base)
2019-10-11 17:27:54 +00:00
{
2020-11-03 11:28:28 +00:00
if (action.node->result_name == column_name && action.node->children.size() > 1)
2019-10-11 17:55:33 +00:00
{
2020-11-03 11:28:28 +00:00
auto name = action.node->function_base->getName();
if ((name == "in" || name == "globalIn"))
{
set_to_check = action.node->children[1]->result_name;
break;
}
2019-10-11 17:55:33 +00:00
}
2019-10-11 17:27:54 +00:00
}
}
if (!set_to_check.empty())
{
2020-04-22 06:01:33 +00:00
for (const auto & action : actions)
2019-10-11 17:27:54 +00:00
{
2020-11-10 14:54:59 +00:00
if (action.node->type == ActionsDAG::ActionType::COLUMN && action.node->result_name == set_to_check)
2019-10-11 17:27:54 +00:00
{
2019-10-27 18:12:40 +00:00
// Constant ColumnSet cannot be empty, so we only need to check non-constant ones.
2020-11-03 11:28:28 +00:00
if (const auto * column_set = checkAndGetColumn<const ColumnSet>(action.node->column.get()))
2019-10-11 17:27:54 +00:00
{
2019-11-01 13:56:33 +00:00
if (column_set->getData()->isCreated() && column_set->getData()->getTotalRowCount() == 0)
2019-10-11 17:27:54 +00:00
return true;
}
}
}
}
return false;
}
2020-11-03 11:28:28 +00:00
void ExpressionActionsChain::addStep(NameSet non_constant_inputs)
{
if (steps.empty())
throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
2020-08-19 19:33:49 +00:00
ColumnsWithTypeAndName columns = steps.back()->getResultColumns();
2020-11-03 11:28:28 +00:00
for (auto & column : columns)
if (column.column && isColumnConst(*column.column) && non_constant_inputs.count(column.name))
column.column = nullptr;
2020-09-11 12:24:41 +00:00
steps.push_back(std::make_unique<ExpressionActionsStep>(std::make_shared<ActionsDAG>(columns)));
}
void ExpressionActionsChain::finalize()
{
2017-04-02 17:37:49 +00:00
/// Finalize all steps. Right to left to define unnecessary input columns.
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
{
2020-08-19 19:33:49 +00:00
Names required_output = steps[i]->required_output;
2018-04-12 09:45:24 +00:00
std::unordered_map<String, size_t> required_output_indexes;
for (size_t j = 0; j < required_output.size(); ++j)
required_output_indexes[required_output[j]] = j;
2020-08-19 19:33:49 +00:00
auto & can_remove_required_output = steps[i]->can_remove_required_output;
2018-04-12 09:45:24 +00:00
if (i + 1 < static_cast<int>(steps.size()))
{
2020-08-19 19:33:49 +00:00
const NameSet & additional_input = steps[i + 1]->additional_input;
for (const auto & it : steps[i + 1]->getRequiredColumns())
2018-04-12 09:45:24 +00:00
{
if (additional_input.count(it.name) == 0)
{
auto iter = required_output_indexes.find(it.name);
if (iter == required_output_indexes.end())
required_output.push_back(it.name);
else if (!can_remove_required_output.empty())
2018-06-29 11:42:44 +00:00
can_remove_required_output[iter->second] = false;
2018-04-12 09:45:24 +00:00
}
}
}
2020-08-19 19:33:49 +00:00
steps[i]->finalize(required_output);
}
2017-04-02 17:37:49 +00:00
/// Adding the ejection of unnecessary columns to the beginning of each step.
for (size_t i = 1; i < steps.size(); ++i)
{
2020-08-19 19:33:49 +00:00
size_t columns_from_previous = steps[i - 1]->getResultColumns().size();
2017-04-02 17:37:49 +00:00
/// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step.
/// Except when we drop all the columns and lose the number of rows in the block.
2020-08-19 19:33:49 +00:00
if (!steps[i]->getResultColumns().empty()
&& columns_from_previous > steps[i]->getRequiredColumns().size())
steps[i]->prependProjectInput();
}
}
2020-07-26 14:21:57 +00:00
std::string ExpressionActionsChain::dumpChain() const
{
2020-11-10 18:22:26 +00:00
WriteBufferFromOwnString ss;
for (size_t i = 0; i < steps.size(); ++i)
{
ss << "step " << i << "\n";
ss << "required output:\n";
2020-08-19 19:33:49 +00:00
for (const std::string & name : steps[i]->required_output)
ss << name << "\n";
2020-08-19 19:33:49 +00:00
ss << "\n" << steps[i]->dump() << "\n";
}
return ss.str();
}
2020-09-08 10:40:53 +00:00
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
: Step({})
2020-08-19 19:33:49 +00:00
, array_join(std::move(array_join_))
2020-08-13 20:17:18 +00:00
, result_columns(std::move(required_columns_))
2020-08-12 08:55:16 +00:00
{
2020-08-13 20:17:18 +00:00
for (auto & column : result_columns)
2020-08-12 08:55:16 +00:00
{
required_columns.emplace_back(NameAndTypePair(column.name, column.type));
if (array_join->columns.count(column.name) > 0)
{
const auto * array = typeid_cast<const DataTypeArray *>(column.type.get());
column.type = array->getNestedType();
/// Arrays are materialized
column.column = nullptr;
}
}
}
2020-08-19 19:33:49 +00:00
void ExpressionActionsChain::ArrayJoinStep::finalize(const Names & required_output_)
2020-08-12 08:55:16 +00:00
{
2020-08-13 20:17:18 +00:00
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
NameSet names(required_output_.begin(), required_output_.end());
for (const auto & column : result_columns)
2020-08-12 08:55:16 +00:00
{
2020-08-13 20:17:18 +00:00
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_result_columns.emplace_back(column);
}
for (const auto & column : required_columns)
{
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_required_columns.emplace_back(column);
}
2020-08-12 08:55:16 +00:00
2020-08-13 20:17:18 +00:00
std::swap(required_columns, new_required_columns);
std::swap(result_columns, new_result_columns);
}
2020-08-12 08:55:16 +00:00
2020-09-08 10:40:53 +00:00
ExpressionActionsChain::JoinStep::JoinStep(
std::shared_ptr<TableJoin> analyzed_join_,
JoinPtr join_,
ColumnsWithTypeAndName required_columns_)
: Step({})
, analyzed_join(std::move(analyzed_join_))
, join(std::move(join_))
, result_columns(std::move(required_columns_))
{
2020-09-08 12:31:36 +00:00
for (const auto & column : result_columns)
required_columns.emplace_back(column.name, column.type);
analyzed_join->addJoinedColumnsAndCorrectTypes(result_columns);
2020-09-08 10:40:53 +00:00
}
void ExpressionActionsChain::JoinStep::finalize(const Names & required_output_)
{
/// We need to update required and result columns by removing unused ones.
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
/// That's an input columns we need.
NameSet required_names(required_output_.begin(), required_output_.end());
for (const auto & name : analyzed_join->keyNamesLeft())
required_names.emplace(name);
for (const auto & column : required_columns)
{
if (required_names.count(column.name) != 0)
new_required_columns.emplace_back(column);
}
/// Result will also contain joined columns.
for (const auto & column : analyzed_join->columnsAddedByJoin())
required_names.emplace(column.name);
for (const auto & column : result_columns)
{
if (required_names.count(column.name) != 0)
new_result_columns.emplace_back(column);
}
std::swap(required_columns, new_required_columns);
std::swap(result_columns, new_result_columns);
}
2020-09-11 12:24:41 +00:00
ActionsDAGPtr & ExpressionActionsChain::Step::actions()
2020-08-12 08:55:16 +00:00
{
2020-11-11 16:52:27 +00:00
return typeid_cast<ExpressionActionsStep *>(this)->actions_dag;
2020-08-13 20:17:18 +00:00
}
2020-09-11 12:24:41 +00:00
const ActionsDAGPtr & ExpressionActionsChain::Step::actions() const
2020-08-13 20:17:18 +00:00
{
2020-11-11 16:52:27 +00:00
return typeid_cast<const ExpressionActionsStep *>(this)->actions_dag;
2020-08-12 08:55:16 +00:00
}
}