better ExpressionAction::execute()

This commit is contained in:
Artem Zuikov 2020-06-25 23:28:41 +03:00
parent bfe30a9723
commit 2d7d389b77
7 changed files with 36 additions and 20 deletions

View File

@ -323,8 +323,20 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
}
}
void ExpressionAction::execute(Block & block, ExtraBlockPtr & not_processed) const
{
switch (type)
{
case JOIN:
join->joinBlock(block, not_processed);
break;
void ExpressionAction::execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const
default:
throw Exception("Unexpected expression call", ErrorCodes::LOGICAL_ERROR);
}
}
void ExpressionAction::execute(Block & block, bool dry_run) const
{
size_t input_rows_count = block.rows();
@ -362,10 +374,7 @@ void ExpressionAction::execute(Block & block, bool dry_run, ExtraBlockPtr & not_
}
case JOIN:
{
join->joinBlock(block, not_processed);
break;
}
throw Exception("Unexpected JOIN expression call", ErrorCodes::LOGICAL_ERROR);
case PROJECT:
{
@ -681,7 +690,7 @@ void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed) co
if (actions.size() != 1)
throw Exception("Continuation over multiple expressions is not supported", ErrorCodes::LOGICAL_ERROR);
actions[0].execute(block, false, not_processed);
actions[0].execute(block, not_processed);
checkLimits(block);
}

View File

@ -139,13 +139,8 @@ private:
void executeOnTotals(Block & block) const;
/// Executes action on block (modify it). Block could be splitted in case of JOIN. Then not_processed block is created.
void execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const;
void execute(Block & block, bool dry_run) const
{
ExtraBlockPtr extra;
execute(block, dry_run, extra);
}
void execute(Block & block, ExtraBlockPtr & not_processed) const;
void execute(Block & block, bool dry_run) const;
};

View File

@ -32,6 +32,7 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPlan/ReadFromStorageStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -873,7 +874,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
Block join_result_sample;
JoinPtr join = expressions.join->getTableJoinAlgo();
join_result_sample = ExpressionTransform::transformHeader(query_plan.getCurrentDataStream().header, expressions.join);
join_result_sample = InflatingExpressionTransform::transformHeader(
query_plan.getCurrentDataStream().header, expressions.join);
QueryPlanStepPtr join_step = std::make_unique<InflatingExpressionStep>(
query_plan.getCurrentDataStream(),

View File

@ -31,7 +31,7 @@ static void filterDistinctColumns(const Block & res_header, NameSet & distinct_c
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
ExpressionTransform::transformHeader(input_stream_.header, expression_),
Transform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
, default_totals(default_totals_)
@ -55,14 +55,14 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<ExpressionTransform>(header, expression, on_totals, add_default_totals);
return std::make_shared<Transform>(header, expression, on_totals, add_default_totals);
});
}
InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
ExpressionTransform::transformHeader(input_stream_.header, expression_),
Transform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
, default_totals(default_totals_)
@ -84,7 +84,7 @@ void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<InflatingExpressionTransform>(header, expression, on_totals, add_default_totals);
return std::make_shared<Transform>(header, expression, on_totals, add_default_totals);
});
}

View File

@ -7,9 +7,14 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ExpressionTransform;
class InflatingExpressionTransform;
class ExpressionStep : public ITransformingStep
{
public:
using Transform = ExpressionTransform;
explicit ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false);
String getName() const override { return "Expression"; }
@ -24,6 +29,8 @@ private:
class InflatingExpressionStep : public ITransformingStep
{
public:
using Transform = InflatingExpressionTransform;
explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false);
String getName() const override { return "Expression"; }

View File

@ -5,9 +5,10 @@
namespace DB
{
static Block transformHeader(Block header, const ExpressionActionsPtr & expression)
Block InflatingExpressionTransform::transformHeader(Block header, const ExpressionActionsPtr & expression)
{
expression->execute(header, true);
ExtraBlockPtr tmp;
expression->execute(header, tmp);
return header;
}

View File

@ -16,6 +16,8 @@ public:
String getName() const override { return "InflatingExpressionTransform"; }
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
protected:
void transform(Chunk & chunk) override;
bool needInputData() const override { return !not_processed; }