Refactor ExpressionActions.

This commit is contained in:
Nikolai Kochetov 2020-10-13 11:16:47 +03:00
parent fa8ed65c4d
commit 07c2730169
3 changed files with 105 additions and 222 deletions

View File

@ -156,7 +156,7 @@ private:
/// This is needed to allow function execution over data. /// This is needed to allow function execution over data.
/// It is safe because functions does not change column names, so index is unaffected. /// It is safe because functions does not change column names, so index is unaffected.
/// It is temporary. /// It is temporary.
friend struct ExpressionAction; friend class ExpressionActions;
friend class ActionsDAG; friend class ActionsDAG;
}; };

View File

@ -523,7 +523,7 @@ ExpressionActions::ExpressionActions(const ColumnsWithTypeAndName & input_column
ExpressionActions::~ExpressionActions() = default; ExpressionActions::~ExpressionActions() = default;
void ExpressionActions::checkLimits(Block & block) const void ExpressionActions::checkLimits(ExecutionContext & execution_context) const
{ {
if (settings.max_temporary_columns && block.columns() > settings.max_temporary_columns) if (settings.max_temporary_columns && block.columns() > settings.max_temporary_columns)
throw Exception("Too many temporary columns: " + block.dumpNames() throw Exception("Too many temporary columns: " + block.dumpNames()
@ -551,62 +551,6 @@ void ExpressionActions::checkLimits(Block & block) const
} }
} }
void ExpressionActions::addInput(const ColumnWithTypeAndName & column)
{
input_columns.emplace_back(column.name, column.type);
sample_block.insert(column);
}
void ExpressionActions::addInput(const NameAndTypePair & column)
{
addInput(ColumnWithTypeAndName(nullptr, column.type, column.name));
}
void ExpressionActions::add(const ExpressionAction & action, Names & out_new_columns)
{
addImpl(action, out_new_columns);
}
void ExpressionActions::add(const ExpressionAction & action)
{
Names new_names;
addImpl(action, new_names);
}
void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
{
if (!action.result_name.empty())
new_names.push_back(action.result_name);
/// Compiled functions are custom functions and they don't need building
if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
if (!sample_block.has(action.argument_names[i]))
throw Exception("Unknown identifier: '" + action.argument_names[i] + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
arguments[i] = sample_block.getByName(action.argument_names[i]);
}
if (!action.function_base)
{
action.function_base = action.function_builder->build(arguments);
action.result_type = action.function_base->getReturnType();
}
}
if (action.type == ExpressionAction::ADD_ALIASES)
for (const auto & name_with_alias : action.projection)
new_names.emplace_back(name_with_alias.second);
action.prepare(sample_block, settings, names_not_for_constant_folding);
actions.push_back(action);
}
void ExpressionActions::prependProjectInput() void ExpressionActions::prependProjectInput()
{ {
actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns())); actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns()));
@ -614,12 +558,33 @@ void ExpressionActions::prependProjectInput()
void ExpressionActions::execute(Block & block, bool dry_run) const void ExpressionActions::execute(Block & block, bool dry_run) const
{ {
ExecutionContext execution_context
{
.input_columns = block.data,
.num_rows = block.rows(),
};
execution_context.columns.reserve(num_columns);
ColumnNumbers inputs_to_remove;
inputs_to_remove.reserve(required_columns.size());
for (const auto & column : required_columns)
{
size_t pos = block.getPositionByName(column.name);
execution_context.columns.emplace_back(std::move(block.getByPosition(pos)));
if (!sample_block.has(column.name))
inputs_to_remove.emplace_back(pos);
}
execution_context.columns.resize(num_columns);
for (const auto & action : actions) for (const auto & action : actions)
{ {
try try
{ {
action.execute(block, dry_run); executeAction(action, execution_context, dry_run);
checkLimits(block); checkLimits(execution_context);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -627,113 +592,112 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
throw; throw;
} }
} }
std::sort(inputs_to_remove.rbegin(), inputs_to_remove.rend());
for (auto input : inputs_to_remove)
block.erase(input);
for (const auto & action : actions)
{
if (!action.is_used_in_result)
continue;
auto & column = execution_context.columns[action.result_position];
column.name = action.node->result_name;
if (block.has(action.node->result_name))
block.getByName(action.node->result_name) = std::move(column);
else
block.insert(std::move(column));
}
} }
void ExpressionActions::executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run) void ExpressionActions::executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run)
{ {
auto & columns = execution_context.columns;
auto & num_rows = execution_context.num_rows;
switch (action.node->type) switch (action.node->type)
{ {
case ActionsDAG::Type::FUNCTION: case ActionsDAG::Type::FUNCTION:
{ {
ColumnNumbers arguments(argument_names.size()); auto & res_column = columns[action.result_position];
for (size_t i = 0; i < argument_names.size(); ++i) if (res_column.type || res_column.column)
arguments[i] = block.getPositionByName(argument_names[i]); throw Exception("Result column is not empty", ErrorCodes::LOGICAL_ERROR);
size_t num_columns_without_result = block.columns(); res_column.type = action.node->result_type;
block.insert({ nullptr, result_type, result_name}); /// Columns names are not used, avoid extra copy.
/// res_column.name = action.node->result_name;
ProfileEvents::increment(ProfileEvents::FunctionExecute); ProfileEvents::increment(ProfileEvents::FunctionExecute);
if (is_function_compiled) if (action.node->is_function_compiled)
ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute); ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run); action.node->function->execute(columns, action.arguments, action.result_position, num_rows, dry_run);
break; break;
} }
case ARRAY_JOIN: case ActionsDAG::Type::ARRAY_JOIN:
{ {
auto source = block.getByName(source_name); size_t array_join_key_pos = action.arguments.front();
block.erase(source_name); auto array_join_key = columns[array_join_key_pos];
source.column = source.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(source.column.get()); /// Remove array join argument in advance if it is not needed.
if (!action.to_remove.empty())
columns[array_join_key_pos] = {};
array_join_key.column = array_join_key.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(array_join_key.column.get());
if (!array) if (!array)
throw Exception("ARRAY JOIN of not array: " + source_name, ErrorCodes::TYPE_MISMATCH); throw Exception("ARRAY JOIN of not array: " + action.node->result_name, ErrorCodes::TYPE_MISMATCH);
for (auto & column : block) for (auto & column : columns)
column.column = column.column->replicate(array->getOffsets()); if (column.column)
column.column = column.column->replicate(array->getOffsets());
source.column = array->getDataPtr(); for (auto & column : execution_context.input_columns)
source.type = assert_cast<const DataTypeArray &>(*source.type).getNestedType(); if (column.column)
source.name = result_name; column.column = column.column->replicate(array->getOffsets());
block.insert(std::move(source)); auto & res_column = columns[action.result_position];
res_column.column = array->getDataPtr();
res_column.type = assert_cast<const DataTypeArray &>(*array_join_key.type).getNestedType();
num_rows = res_column.column->size();
break; break;
} }
case PROJECT: case ActionsDAG::Type::COLUMN:
{ {
Block new_block; auto & res_column = columns[action.result_position];
res_column.column = action.node->column->cloneResized(num_rows);
for (const auto & elem : projection) res_column.type = action.node->result_type;
{
const std::string & name = elem.first;
const std::string & alias = elem.second;
ColumnWithTypeAndName column = block.getByName(name);
if (!alias.empty())
column.name = alias;
new_block.insert(std::move(column));
}
block.swap(new_block);
break; break;
} }
case ADD_ALIASES: case ActionsDAG::Type::ALIAS:
{ {
for (const auto & elem : projection) /// Do not care about names, they are empty.
{ columns[action.result_position] = columns[action.arguments.front()];
const std::string & name = elem.first;
const std::string & alias = elem.second;
const ColumnWithTypeAndName & column = block.getByName(name);
if (!alias.empty() && !block.has(alias))
block.insert({column.column, column.type, alias});
}
break; break;
} }
case REMOVE_COLUMN: case ActionsDAG::Type::INPUT:
block.erase(source_name); {
break; throw Exception("Cannot execute INPUT action", ErrorCodes::LOGICAL_ERROR);
}
case ADD_COLUMN:
block.insert({ added_column->cloneResized(input_rows_count), result_type, result_name });
break;
case COPY_COLUMN:
if (can_replace && block.has(result_name))
{
auto & result = block.getByName(result_name);
const auto & source = block.getByName(source_name);
result.type = source.type;
result.column = source.column;
}
else
{
const auto & source_column = block.getByName(source_name);
block.insert({source_column.column, source_column.type, result_name});
}
break;
} }
for (auto to_remove : action.to_remove)
columns[to_remove] = {};
} }
bool ExpressionActions::hasArrayJoin() const bool ExpressionActions::hasArrayJoin() const
{ {
for (const auto & action : actions) for (const auto & action : actions)
if (action.type == ExpressionAction::ARRAY_JOIN) if (action.node->type == ActionsDAG::Type::ARRAY_JOIN)
return true; return true;
return false; return false;
@ -1013,86 +977,6 @@ std::string ExpressionActions::dumpActions() const
return ss.str(); return ss.str();
} }
void ExpressionActions::optimizeArrayJoin()
{
const size_t none = actions.size();
size_t first_array_join = none;
/// Columns that need to be evaluated for arrayJoin.
/// Actions for adding them can not be moved to the left of the arrayJoin.
NameSet array_joined_columns;
/// Columns needed to evaluate arrayJoin or those that depend on it.
/// Actions to delete them can not be moved to the left of the arrayJoin.
NameSet array_join_dependencies;
for (size_t i = 0; i < actions.size(); ++i)
{
/// Do not move the action to the right of the projection (the more that they are not usually there).
if (actions[i].type == ExpressionAction::PROJECT)
break;
bool depends_on_array_join = false;
Names needed;
if (actions[i].type == ExpressionAction::ARRAY_JOIN)
{
depends_on_array_join = true;
needed = actions[i].getNeededColumns();
}
else
{
if (first_array_join == none)
continue;
needed = actions[i].getNeededColumns();
for (const auto & elem : needed)
{
if (array_joined_columns.count(elem))
{
depends_on_array_join = true;
break;
}
}
}
if (depends_on_array_join)
{
if (first_array_join == none)
first_array_join = i;
if (!actions[i].result_name.empty())
array_joined_columns.insert(actions[i].result_name);
array_join_dependencies.insert(needed.begin(), needed.end());
}
else
{
bool can_move = false;
if (actions[i].type == ExpressionAction::REMOVE_COLUMN)
{
/// If you delete a column that is not needed for arrayJoin (and those who depend on it), you can delete it before arrayJoin.
can_move = !array_join_dependencies.count(actions[i].source_name);
}
else
{
/// If the action does not delete the columns and does not depend on the result of arrayJoin, you can make it until arrayJoin.
can_move = true;
}
/// Move the current action to the position just before the first arrayJoin.
if (can_move)
{
/// Move the i-th element to the position `first_array_join`.
std::rotate(actions.begin() + first_array_join, actions.begin() + i, actions.begin() + i + 1);
++first_array_join;
}
}
}
}
ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns) ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns)
{ {
/// Create new actions. /// Create new actions.

View File

@ -175,6 +175,8 @@ public:
FunctionBasePtr function_base; FunctionBasePtr function_base;
/// Prepared function which is used in function execution. /// Prepared function which is used in function execution.
ExecutableFunctionPtr function; ExecutableFunctionPtr function;
/// If function is a compiled statement.
bool is_function_compiled = false;
/// For COLUMN node and propagated constants. /// For COLUMN node and propagated constants.
ColumnPtr column; ColumnPtr column;
@ -231,35 +233,32 @@ private:
using Node = ActionsDAG::Node; using Node = ActionsDAG::Node;
using Index = ActionsDAG::Index; using Index = ActionsDAG::Index;
struct Argument
{
size_t position;
bool can_remove;
};
using Arguments = std::vector<Argument>;
struct Action struct Action
{ {
Node * node; Node * node;
Arguments arguments; ColumnNumbers arguments;
/// Columns which will be removed after actions is executed.
/// It is always a subset of arguments.
ColumnNumbers to_remove;
size_t result_position; size_t result_position;
bool is_used_in_result;
}; };
using Actions = std::vector<Action>; using Actions = std::vector<Action>;
struct ExecutionContext struct ExecutionContext
{ {
ColumnsWithTypeAndName input_columns; ColumnsWithTypeAndName & input_columns;
ColumnsWithTypeAndName columns; ColumnsWithTypeAndName columns;
size_t num_rows; size_t num_rows;
}; };
std::list<Node> nodes; std::list<Node> nodes;
Index index;
Actions actions; Actions actions;
size_t num_columns;
NamesAndTypesList required_columns; NamesAndTypesList required_columns;
Block sample_block;
public: public:
~ExpressionActions(); ~ExpressionActions();
@ -283,7 +282,7 @@ public:
/// Get a list of input columns. /// Get a list of input columns.
Names getRequiredColumns() const; Names getRequiredColumns() const;
const NamesAndTypesList & getRequiredColumnsWithTypes() const; const NamesAndTypesList & getRequiredColumnsWithTypes() const { return required_columns; }
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns. /// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, bool dry_run = false) const; void execute(Block & block, bool dry_run = false) const;
@ -291,7 +290,7 @@ public:
bool hasArrayJoin() const; bool hasArrayJoin() const;
/// Obtain a sample block that contains the names and types of result columns. /// Obtain a sample block that contains the names and types of result columns.
const Block & getSampleBlock() const; const Block & getSampleBlock() const { return sample_block; }
std::string dumpActions() const; std::string dumpActions() const;
@ -310,9 +309,9 @@ private:
std::shared_ptr<CompiledExpressionCache> compilation_cache; std::shared_ptr<CompiledExpressionCache> compilation_cache;
#endif #endif
void checkLimits(Block & block) const; void checkLimits(ExecutionContext & execution_context) const;
void executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run); static void executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run);
}; };