Refactor ExpressionActions [Part 1].

This commit is contained in:
Nikolai Kochetov 2020-10-07 21:37:27 +03:00
parent 5f7aeddfe5
commit 11e86ed64f
2 changed files with 142 additions and 58 deletions

View File

@ -629,6 +629,107 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
}
}
void ExpressionActions::executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run)
{
switch (action.node->type)
{
case ActionsDAG::Type::FUNCTION:
{
ColumnNumbers arguments(argument_names.size());
for (size_t i = 0; i < argument_names.size(); ++i)
arguments[i] = block.getPositionByName(argument_names[i]);
size_t num_columns_without_result = block.columns();
block.insert({ nullptr, result_type, result_name});
ProfileEvents::increment(ProfileEvents::FunctionExecute);
if (is_function_compiled)
ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run);
break;
}
case ARRAY_JOIN:
{
auto source = block.getByName(source_name);
block.erase(source_name);
source.column = source.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(source.column.get());
if (!array)
throw Exception("ARRAY JOIN of not array: " + source_name, ErrorCodes::TYPE_MISMATCH);
for (auto & column : block)
column.column = column.column->replicate(array->getOffsets());
source.column = array->getDataPtr();
source.type = assert_cast<const DataTypeArray &>(*source.type).getNestedType();
source.name = result_name;
block.insert(std::move(source));
break;
}
case PROJECT:
{
Block new_block;
for (const auto & elem : projection)
{
const std::string & name = elem.first;
const std::string & alias = elem.second;
ColumnWithTypeAndName column = block.getByName(name);
if (!alias.empty())
column.name = alias;
new_block.insert(std::move(column));
}
block.swap(new_block);
break;
}
case ADD_ALIASES:
{
for (const auto & elem : projection)
{
const std::string & name = elem.first;
const std::string & alias = elem.second;
const ColumnWithTypeAndName & column = block.getByName(name);
if (!alias.empty() && !block.has(alias))
block.insert({column.column, column.type, alias});
}
break;
}
case REMOVE_COLUMN:
block.erase(source_name);
break;
case ADD_COLUMN:
block.insert({ added_column->cloneResized(input_rows_count), result_type, result_name });
break;
case COPY_COLUMN:
if (can_replace && block.has(result_name))
{
auto & result = block.getByName(result_name);
const auto & source = block.getByName(source_name);
result.type = source.type;
result.column = source.column;
}
else
{
const auto & source_column = block.getByName(source_name);
block.insert({source_column.column, source_column.type, result_name});
}
break;
}
}
bool ExpressionActions::hasArrayJoin() const
{
for (const auto & action : actions)

View File

@ -193,8 +193,8 @@ public:
ActionsDAG() = default;
ActionsDAG(const ActionsDAG &) = delete;
ActionsDAG & operator=(const ActionsDAG &) = delete;
ActionsDAG(const NamesAndTypesList & inputs);
ActionsDAG(const ColumnsWithTypeAndName & inputs);
explicit ActionsDAG(const NamesAndTypesList & inputs);
explicit ActionsDAG(const ColumnsWithTypeAndName & inputs);
const Index & getIndex() const { return index; }
@ -227,29 +227,45 @@ using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
*/
class ExpressionActions
{
private:
using Node = ActionsDAG::Node;
using Index = ActionsDAG::Index;
struct Argument
{
size_t position;
bool can_remove;
};
using Arguments = std::vector<Argument>;
struct Action
{
Node * node;
Arguments arguments;
size_t result_position;
};
using Actions = std::vector<Action>;
struct ExecutionContext
{
ColumnsWithTypeAndName input_columns;
ColumnsWithTypeAndName columns;
size_t num_rows;
};
std::list<Node> nodes;
Index index;
Actions actions;
NamesAndTypesList required_columns;
public:
using Actions = std::vector<ExpressionAction>;
ExpressionActions(const NamesAndTypesList & input_columns_, const Context & context_);
/// For constant columns the columns themselves can be contained in `input_columns_`.
ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Context & context_);
~ExpressionActions();
ExpressionActions(const ExpressionActions & other) = default;
/// Add the input column.
/// The name of the column must not match the names of the intermediate columns that occur when evaluating the expression.
/// The expression must not have any PROJECT actions.
void addInput(const ColumnWithTypeAndName & column);
void addInput(const NameAndTypePair & column);
void add(const ExpressionAction & action);
/// Adds new column names to out_new_columns (formed as a result of the added action).
void add(const ExpressionAction & action, Names & out_new_columns);
/// Adds to the beginning the removal of all extra columns.
void prependProjectInput();
@ -263,20 +279,11 @@ public:
/// - Does not reorder the columns.
/// - Does not remove "unexpected" columns (for example, added by functions).
/// - If output_columns is empty, leaves one arbitrary column (so that the number of rows in the block is not lost).
void finalize(const Names & output_columns);
const Actions & getActions() const { return actions; }
// void finalize(const Names & output_columns);
/// Get a list of input columns.
Names getRequiredColumns() const
{
Names names;
for (const auto & input : input_columns)
names.push_back(input.name);
return names;
}
const NamesAndTypesList & getRequiredColumnsWithTypes() const { return input_columns; }
Names getRequiredColumns() const;
const NamesAndTypesList & getRequiredColumnsWithTypes() const;
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, bool dry_run = false) const;
@ -284,7 +291,7 @@ public:
bool hasArrayJoin() const;
/// Obtain a sample block that contains the names and types of result columns.
const Block & getSampleBlock() const { return sample_block; }
const Block & getSampleBlock() const;
std::string dumpActions() const;
@ -296,28 +303,7 @@ public:
/// Call it only after subqueries for sets were executed.
bool checkColumnIsAlwaysFalse(const String & column_name) const;
struct ActionsHash
{
UInt128 operator()(const ExpressionActions::Actions & elems) const
{
SipHash hash;
for (const ExpressionAction & act : elems)
hash.update(ExpressionAction::ActionHash{}(act));
UInt128 result;
hash.get128(result.low, result.high);
return result;
}
};
private:
/// These columns have to be in input blocks (arguments of execute* methods)
NamesAndTypesList input_columns;
/// These actions will be executed on input blocks
Actions actions;
/// The example of result (output) block.
Block sample_block;
/// Columns which can't be used for constant folding.
NameSet names_not_for_constant_folding;
Settings settings;
#if USE_EMBEDDED_COMPILER
@ -326,10 +312,7 @@ private:
void checkLimits(Block & block) const;
void addImpl(ExpressionAction action, Names & new_names);
/// Move all arrayJoin as close as possible to the end.
void optimizeArrayJoin();
void executeAction(const Action & action, ExecutionContext & execution_context, bool dry_run);
};