Merge pull request #24108 from ClickHouse/actions-dag-calc-header

Calculate header from ActionsDAG
This commit is contained in:
Nikolai Kochetov 2021-05-14 13:55:00 +03:00 committed by GitHub
commit bb11cde871
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 194 additions and 25 deletions

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int THERE_IS_NO_COLUMN;
extern const int ILLEGAL_COLUMN;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
}
const char * ActionsDAG::typeToString(ActionsDAG::ActionType type)
@ -439,6 +440,164 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
inputs.erase(it, inputs.end());
}
static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments)
{
ColumnWithTypeAndName res_column;
res_column.type = node->result_type;
res_column.name = node->result_name;
switch (node->type)
{
case ActionsDAG::ActionType::FUNCTION:
{
// bool all_args_are_const = true;
// for (const auto & argument : arguments)
// if (typeid_cast<const ColumnConst *>(argument.column.get()) == nullptr)
// all_args_are_const = false;
res_column.column = node->function->execute(arguments, res_column.type, 0, true);
// if (!all_args_are_const)
// res_column.column = res_column.column->convertToFullColumnIfConst();
break;
}
case ActionsDAG::ActionType::ARRAY_JOIN:
{
auto key = arguments.at(0);
key.column = key.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(key.column.get());
if (!array)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"ARRAY JOIN of not array: {}", node->result_name);
res_column.column = array->getDataPtr()->cloneEmpty();
break;
}
case ActionsDAG::ActionType::COLUMN:
{
res_column.column = node->column->cloneResized(0);
break;
}
case ActionsDAG::ActionType::ALIAS:
{
res_column.column = arguments.at(0).column;
break;
}
case ActionsDAG::ActionType::INPUT:
{
break;
}
}
return res_column;
}
Block ActionsDAG::updateHeader(Block header) const
{
std::unordered_map<const Node *, ColumnWithTypeAndName> node_to_column;
std::set<size_t> pos_to_remove;
{
std::unordered_map<std::string_view, std::list<size_t>> input_positions;
for (size_t pos = 0; pos < inputs.size(); ++pos)
input_positions[inputs[pos]->result_name].emplace_back(pos);
for (size_t pos = 0; pos < header.columns(); ++pos)
{
const auto & col = header.getByPosition(pos);
auto it = input_positions.find(col.name);
if (it != input_positions.end() && !it->second.empty())
{
auto & list = it->second;
pos_to_remove.insert(pos);
node_to_column[inputs[list.front()]] = std::move(col);
list.pop_front();
}
}
}
ColumnsWithTypeAndName result_columns;
result_columns.reserve(index.size());
struct Frame
{
const Node * node;
size_t next_child = 0;
};
{
for (const auto * output : index)
{
if (node_to_column.count(output) == 0)
{
std::stack<Frame> stack;
stack.push({.node = output});
while (!stack.empty())
{
auto & frame = stack.top();
const auto * node = frame.node;
while (frame.next_child < node->children.size())
{
const auto * child = node->children[frame.next_child];
if (node_to_column.count(child) == 0)
{
stack.push({.node = child});
break;
}
++frame.next_child;
}
if (frame.next_child < node->children.size())
continue;
stack.pop();
ColumnsWithTypeAndName arguments(node->children.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
arguments[i] = node_to_column[node->children[i]];
if (!arguments[i].column)
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Not found column {} in block", node->children[i]->result_name);
}
node_to_column[node] = executeActionForHeader(node, std::move(arguments));
}
}
auto & column = node_to_column[output];
if (column.column)
result_columns.push_back(node_to_column[output]);
}
}
if (isInputProjected())
header.clear();
else
header.erase(pos_to_remove);
Block res;
for (auto & col : result_columns)
res.insert(std::move(col));
for (const auto & item : header)
res.insert(std::move(item));
return res;
}
NameSet ActionsDAG::foldActionsByProjection(
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys)
{

View File

@ -186,6 +186,14 @@ public:
ActionsDAGPtr clone() const;
/// Execute actions for header. Input block must have empty columns.
/// Result should be equal to the execution of ExpressionActions build form this DAG.
/// Actions are not changed, no expressions are compiled.
///
/// In addition, check that result constants are constants according to DAG.
/// In case if function return constant, but arguments are not constant, materialize it.
Block updateHeader(Block header) const;
/// For apply materialize() function for every output.
/// Also add aliases so the result names remain unchanged.
void addMaterializingOutputActions();

View File

@ -31,7 +31,7 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions)
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, std::make_shared<ExpressionActions>(actions_dag_, ExpressionActionsSettings{})),
ExpressionTransform::transformHeader(input_stream_.header, *actions_dag_),
getTraits(actions_dag_))
, actions_dag(std::move(actions_dag_))
{
@ -42,8 +42,7 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr a
void ExpressionStep::updateInputStream(DataStream input_stream, bool keep_header)
{
Block out_header = keep_header ? std::move(output_stream->header)
: Transform::transformHeader(input_stream.header,
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings{}));
: ExpressionTransform::transformHeader(input_stream.header, *actions_dag);
output_stream = createOutputStream(
input_stream,
std::move(out_header),
@ -58,7 +57,7 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline, const BuildQuer
auto expression = std::make_shared<ExpressionActions>(actions_dag, settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<Transform>(header, expression);
return std::make_shared<ExpressionTransform>(header, expression);
});
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))

View File

@ -14,7 +14,6 @@ class JoiningTransform;
class ExpressionStep : public ITransformingStep
{
public:
using Transform = ExpressionTransform;
explicit ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_);
String getName() const override { return "Expression"; }

View File

@ -34,7 +34,7 @@ FilterStep::FilterStep(
input_stream_,
FilterTransform::transformHeader(
input_stream_.header,
std::make_shared<ExpressionActions>(actions_dag_, ExpressionActionsSettings{}),
*actions_dag_,
filter_column_name_,
remove_filter_column_),
getTraits(actions_dag_))
@ -52,7 +52,7 @@ void FilterStep::updateInputStream(DataStream input_stream, bool keep_header)
if (keep_header)
out_header = FilterTransform::transformHeader(
input_stream.header,
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings{}),
*actions_dag,
filter_column_name,
remove_filter_column);

View File

@ -37,7 +37,7 @@ TotalsHavingStep::TotalsHavingStep(
input_stream_,
TotalsHavingTransform::transformHeader(
input_stream_.header,
(actions_dag_ ? std::make_shared<ExpressionActions>(actions_dag_, ExpressionActionsSettings{}) : nullptr),
actions_dag_.get(),
final_),
getTraits(!filter_column_.empty()))
, overflow_row(overflow_row_)

View File

@ -3,16 +3,14 @@
namespace DB
{
Block ExpressionTransform::transformHeader(Block header, const ExpressionActionsPtr & expression)
Block ExpressionTransform::transformHeader(Block header, const ActionsDAG & expression)
{
size_t num_rows = header.rows();
expression->execute(header, num_rows, true);
return header;
return expression.updateHeader(std::move(header));
}
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, transformHeader(header_, expression_), false)
: ISimpleTransform(header_, transformHeader(header_, expression_->getActionsDAG()), false)
, expression(std::move(expression_))
{
}

View File

@ -7,6 +7,8 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ActionsDAG;
/** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%'
@ -21,7 +23,7 @@ public:
String getName() const override { return "ExpressionTransform"; }
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
static Block transformHeader(Block header, const ActionsDAG & expression);
protected:
void transform(Chunk & chunk) override;

View File

@ -29,12 +29,11 @@ static void replaceFilterToConstant(Block & block, const String & filter_column_
Block FilterTransform::transformHeader(
Block header,
const ExpressionActionsPtr & expression,
const ActionsDAG & expression,
const String & filter_column_name,
bool remove_filter_column)
{
size_t num_rows = header.rows();
expression->execute(header, num_rows);
header = expression.updateHeader(std::move(header));
if (remove_filter_column)
header.erase(filter_column_name);
@ -50,7 +49,10 @@ FilterTransform::FilterTransform(
String filter_column_name_,
bool remove_filter_column_,
bool on_totals_)
: ISimpleTransform(header_, transformHeader(header_, expression_, filter_column_name_, remove_filter_column_), true)
: ISimpleTransform(
header_,
transformHeader(header_, expression_->getActionsDAG(), filter_column_name_, remove_filter_column_),
true)
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
, remove_filter_column(remove_filter_column_)

View File

@ -8,6 +8,8 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ActionsDAG;
/** Implements WHERE, HAVING operations.
* Takes an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions.
* The expression is evaluated and result chunks contain only the filtered rows.
@ -22,7 +24,7 @@ public:
static Block transformHeader(
Block header,
const ExpressionActionsPtr & expression,
const ActionsDAG & expression,
const String & filter_column_name,
bool remove_filter_column);

View File

@ -28,15 +28,13 @@ void finalizeChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows);
}
Block TotalsHavingTransform::transformHeader(Block block, const ExpressionActionsPtr & expression, bool final)
Block TotalsHavingTransform::transformHeader(Block block, const ActionsDAG * expression, bool final)
{
if (final)
finalizeBlock(block);
size_t num_rows = block.rows();
if (expression)
expression->execute(block, num_rows);
block = expression->updateHeader(std::move(block));
return block;
}
@ -49,7 +47,7 @@ TotalsHavingTransform::TotalsHavingTransform(
TotalsMode totals_mode_,
double auto_include_threshold_,
bool final_)
: ISimpleTransform(header, transformHeader(header, expression_, final_), true)
: ISimpleTransform(header, transformHeader(header, expression_ ? &expression_->getActionsDAG() : nullptr, final_), true)
, overflow_row(overflow_row_)
, expression(expression_)
, filter_column_name(filter_column_)

View File

@ -12,6 +12,8 @@ using ArenaPtr = std::shared_ptr<Arena>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ActionsDAG;
enum class TotalsMode;
/** Takes blocks after grouping, with non-finalized aggregate functions.
@ -37,7 +39,7 @@ public:
Status prepare() override;
void work() override;
static Block transformHeader(Block block, const ExpressionActionsPtr & expression, bool final);
static Block transformHeader(Block block, const ActionsDAG * expression, bool final);
protected:
void transform(Chunk & chunk) override;