diff --git a/src/DataStreams/ExpressionBlockInputStream.cpp b/src/DataStreams/ExpressionBlockInputStream.cpp index cce02af8262..9673395a21a 100644 --- a/src/DataStreams/ExpressionBlockInputStream.cpp +++ b/src/DataStreams/ExpressionBlockInputStream.cpp @@ -44,33 +44,4 @@ Block ExpressionBlockInputStream::readImpl() return res; } -Block InflatingExpressionBlockInputStream::readImpl() -{ - if (!initialized) - { - if (expression->resultIsAlwaysEmpty()) - return {}; - - initialized = true; - } - - Block res; - bool keep_going = not_processed && not_processed->empty(); /// There's data inside expression. - - if (!not_processed || keep_going) - { - not_processed.reset(); - - res = children.back()->read(); - if (res || keep_going) - expression->execute(res, not_processed, action_number); - } - else - { - res = std::move(not_processed->block); - expression->execute(res, not_processed, action_number); - } - return res; -} - } diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index 0a99fcd6f21..32e3000a65d 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -323,8 +323,20 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings, } } +void ExpressionAction::execute(Block & block, ExtraBlockPtr & not_processed) const +{ + switch (type) + { + case JOIN: + join->joinBlock(block, not_processed); + break; -void ExpressionAction::execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const + default: + throw Exception("Unexpected expression call", ErrorCodes::LOGICAL_ERROR); + } +} + +void ExpressionAction::execute(Block & block, bool dry_run) const { size_t input_rows_count = block.rows(); @@ -362,10 +374,7 @@ void ExpressionAction::execute(Block & block, bool dry_run, ExtraBlockPtr & not_ } case JOIN: - { - join->joinBlock(block, not_processed); - break; - } + throw Exception("Unexpected JOIN expression call", ErrorCodes::LOGICAL_ERROR); case PROJECT: { @@ -676,19 +685,13 @@ void ExpressionActions::execute(Block & block, bool dry_run) const } } -/// @warning It's a tricky method that allows to continue ONLY ONE action in reason of one-to-many ALL JOIN logic. -void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed, size_t & start_action) const +void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed) const { - size_t i = start_action; - start_action = 0; - for (; i < actions.size(); ++i) - { - actions[i].execute(block, false, not_processed); - checkLimits(block); + if (actions.size() != 1) + throw Exception("Continuation over multiple expressions is not supported", ErrorCodes::LOGICAL_ERROR); - if (not_processed) - start_action = i; - } + actions[0].execute(block, not_processed); + checkLimits(block); } bool ExpressionActions::hasJoinOrArrayJoin() const diff --git a/src/Interpreters/ExpressionActions.h b/src/Interpreters/ExpressionActions.h index 06adef24bc6..1aae3f5e021 100644 --- a/src/Interpreters/ExpressionActions.h +++ b/src/Interpreters/ExpressionActions.h @@ -139,13 +139,8 @@ private: void executeOnTotals(Block & block) const; /// Executes action on block (modify it). Block could be splitted in case of JOIN. Then not_processed block is created. - void execute(Block & block, bool dry_run, ExtraBlockPtr & not_processed) const; - - void execute(Block & block, bool dry_run) const - { - ExtraBlockPtr extra; - execute(block, dry_run, extra); - } + void execute(Block & block, ExtraBlockPtr & not_processed) const; + void execute(Block & block, bool dry_run) const; }; @@ -211,8 +206,8 @@ public: /// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns. void execute(Block & block, bool dry_run = false) const; - /// Execute the expression on the block with continuation. - void execute(Block & block, ExtraBlockPtr & not_processed, size_t & start_action) const; + /// Execute the expression on the block with continuation. This method in only supported for single JOIN. + void execute(Block & block, ExtraBlockPtr & not_processed) const; bool hasJoinOrArrayJoin() const; @@ -325,10 +320,14 @@ struct ExpressionActionsChain steps.clear(); } - ExpressionActionsPtr getLastActions() + ExpressionActionsPtr getLastActions(bool allow_empty = false) { if (steps.empty()) + { + if (allow_empty) + return {}; throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR); + } return steps.back().actions; } @@ -341,6 +340,13 @@ struct ExpressionActionsChain return steps.back(); } + Step & lastStep(const NamesAndTypesList & columns) + { + if (steps.empty()) + steps.emplace_back(std::make_shared(columns, context)); + return steps.back(); + } + std::string dumpChain(); }; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 26516fed8f3..44aa70b1697 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -453,14 +453,6 @@ const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() cons return getSelectQuery(); } -void ExpressionAnalyzer::initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const -{ - if (chain.steps.empty()) - { - chain.steps.emplace_back(std::make_shared(columns, context)); - } -} - /// "Big" ARRAY JOIN. void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool array_join_is_left) const { @@ -487,8 +479,7 @@ bool SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & cha if (!array_join_expression_list) return false; - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); getRootActions(array_join_expression_list, only_types, step.actions); @@ -502,18 +493,20 @@ void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr j actions->add(ExpressionAction::ordinaryJoin(syntax->analyzed_join, join)); } -bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types) +bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types) { - const ASTTablesInSelectQueryElement * ast_join = getSelectQuery()->join(); - if (!ast_join) - return false; - - JoinPtr table_join = makeTableJoin(*ast_join); - - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions); + return true; +} + +bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain) +{ + JoinPtr table_join = makeTableJoin(*syntax->ast_join); + + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); + addJoinAction(step.actions, table_join); return true; } @@ -637,8 +630,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( if (!select_query->prewhere()) return false; - initChain(chain, sourceColumns()); - auto & step = chain.getLastStep(); + auto & step = chain.lastStep(sourceColumns()); getRootActions(select_query->prewhere(), only_types, step.actions); String prewhere_column_name = select_query->prewhere()->getColumnName(); step.required_output.push_back(prewhere_column_name); @@ -705,8 +697,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere( void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsChain & chain, ExpressionActionsPtr actions, String column_name) { - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); // FIXME: assert(filter_info); step.actions = std::move(actions); @@ -723,8 +714,7 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, if (!select_query->where()) return false; - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); step.required_output.push_back(select_query->where()->getColumnName()); step.can_remove_required_output = {true}; @@ -742,8 +732,7 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain if (!select_query->groupBy()) return false; - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); ASTs asts = select_query->groupBy()->children; for (const auto & ast : asts) @@ -769,8 +758,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression { const auto * select_query = getAggregatingQuery(); - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); for (const auto & desc : aggregate_descriptions) for (const auto & name : desc.argument_names) @@ -801,8 +789,7 @@ bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, if (!select_query->having()) return false; - initChain(chain, aggregated_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); step.required_output.push_back(select_query->having()->getColumnName()); getRootActions(select_query->having(), only_types, step.actions); @@ -814,8 +801,7 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, { const auto * select_query = getSelectQuery(); - initChain(chain, aggregated_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); getRootActions(select_query->select(), only_types, step.actions); @@ -831,8 +817,7 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain if (!select_query->orderBy()) return false; - initChain(chain, aggregated_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); getRootActions(select_query->orderBy(), only_types, step.actions); @@ -864,8 +849,7 @@ bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain if (!select_query->limitBy()) return false; - initChain(chain, aggregated_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); getRootActions(select_query->limitBy(), only_types, step.actions); @@ -890,8 +874,7 @@ void SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & { const auto * select_query = getSelectQuery(); - initChain(chain, aggregated_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns); NamesWithAliases result_columns; @@ -939,8 +922,7 @@ void SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types) { - initChain(chain, sourceColumns()); - ExpressionActionsChain::Step & step = chain.steps.back(); + ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns()); getRootActions(expr, only_types, step.actions); step.required_output.push_back(expr->getColumnName()); } @@ -1101,10 +1083,18 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( query_analyzer.appendArrayJoin(chain, only_types || !first_stage); - if (query_analyzer.appendJoin(chain, only_types || !first_stage)) + if (query_analyzer.hasTableJoin()) { - before_join = chain.getLastActions(); - if (!hasJoin()) + query_analyzer.appendJoinLeftKeys(chain, only_types || !first_stage); + + before_join = chain.getLastActions(true); + if (before_join) + chain.addStep(); + + query_analyzer.appendJoin(chain); + + join = chain.getLastActions(); + if (!join) throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR); chain.addStep(); } @@ -1153,11 +1143,11 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( } bool join_allow_read_in_order = true; - if (before_join) + if (hasJoin()) { /// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin. - auto join = before_join->getTableJoinAlgo(); - join_allow_read_in_order = typeid_cast(join.get()) && !join->hasStreamWithNonJoinedRows(); + auto join_algo = join->getTableJoinAlgo(); + join_allow_read_in_order = typeid_cast(join_algo.get()) && !join_algo->hasStreamWithNonJoinedRows(); } optimize_read_in_order = diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 1015631d9da..bd099693a91 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -153,9 +153,6 @@ protected: void analyzeAggregation(); bool makeAggregateDescriptions(ExpressionActionsPtr & actions); - /// columns - the columns that are present before the transformations begin. - void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const; - const ASTSelectQuery * getSelectQuery() const; bool isRemoteStorage() const; @@ -178,7 +175,8 @@ struct ExpressionAnalysisResult bool optimize_read_in_order = false; bool optimize_aggregation_in_order = false; - ExpressionActionsPtr before_join; /// including JOIN + ExpressionActionsPtr before_join; + ExpressionActionsPtr join; ExpressionActionsPtr before_where; ExpressionActionsPtr before_aggregation; ExpressionActionsPtr before_having; @@ -214,7 +212,7 @@ struct ExpressionAnalysisResult /// Filter for row-level security. bool hasFilter() const { return filter_info.get(); } - bool hasJoin() const { return before_join.get(); } + bool hasJoin() const { return join.get(); } bool hasPrewhere() const { return prewhere_info.get(); } bool hasWhere() const { return before_where.get(); } bool hasHaving() const { return before_having.get(); } @@ -249,6 +247,7 @@ public: /// Does the expression have aggregate functions or a GROUP BY or HAVING section. bool hasAggregation() const { return has_aggregation; } bool hasGlobalSubqueries() { return has_global_subqueries; } + bool hasTableJoin() const { return syntax->ast_join; } const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; } const AggregateDescriptions & aggregates() const { return aggregate_descriptions; } @@ -307,7 +306,8 @@ private: /// Before aggregation: bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types); - bool appendJoin(ExpressionActionsChain & chain, bool only_types); + bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types); + bool appendJoin(ExpressionActionsChain & chain); /// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection. void appendPreliminaryFilter(ExpressionActionsChain & chain, ExpressionActionsPtr actions, String column_name); /// remove_filter is set in ExpressionActionsChain::finalize(); diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 178c9aa7d8c..bf5436e88da 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -858,52 +859,38 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu query_plan.addStep(std::move(row_level_security_step)); } + if (expressions.before_join) + { + QueryPlanStepPtr before_join_step = std::make_unique( + query_plan.getCurrentDataStream(), + expressions.before_join); + before_join_step->setStepDescription("Before JOIN"); + query_plan.addStep(std::move(before_join_step)); + } + if (expressions.hasJoin()) { Block join_result_sample; - JoinPtr join = expressions.before_join->getTableJoinAlgo(); + JoinPtr join = expressions.join->getTableJoinAlgo(); - join_result_sample = ExpressionTransform::transformHeader(query_plan.getCurrentDataStream().header, expressions.before_join); + join_result_sample = InflatingExpressionTransform::transformHeader( + query_plan.getCurrentDataStream().header, expressions.join); - bool inflating_join = false; - if (join) + QueryPlanStepPtr join_step = std::make_unique( + query_plan.getCurrentDataStream(), + expressions.join); + + join_step->setStepDescription("JOIN"); + query_plan.addStep(std::move(join_step)); + + if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size)) { - inflating_join = true; - if (auto * hash_join = typeid_cast(join.get())) - inflating_join = isCross(hash_join->getKind()); - } + auto source = std::make_shared(std::move(stream)); + auto add_non_joined_rows_step = std::make_unique( + query_plan.getCurrentDataStream(), std::move(source)); - QueryPlanStepPtr before_join_step; - if (inflating_join) - { - before_join_step = std::make_unique( - query_plan.getCurrentDataStream(), - expressions.before_join, - true); - - } - else - { - before_join_step = std::make_unique( - query_plan.getCurrentDataStream(), - expressions.before_join, - true); - } - - before_join_step->setStepDescription("JOIN"); - query_plan.addStep(std::move(before_join_step)); - - if (join) - { - if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size)) - { - auto source = std::make_shared(std::move(stream)); - auto add_non_joined_rows_step = std::make_unique( - query_plan.getCurrentDataStream(), std::move(source)); - - add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN"); - query_plan.addStep(std::move(add_non_joined_rows_step)); - } + add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN"); + query_plan.addStep(std::move(add_non_joined_rows_step)); } } diff --git a/src/Interpreters/SyntaxAnalyzer.cpp b/src/Interpreters/SyntaxAnalyzer.cpp index 5dec563eebb..9bc7ae055d2 100644 --- a/src/Interpreters/SyntaxAnalyzer.cpp +++ b/src/Interpreters/SyntaxAnalyzer.cpp @@ -1016,6 +1016,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect( result.aggregates = getAggregates(query, *select_query); result.collectUsedColumns(query, true); + result.ast_join = select_query->join(); if (result.optimize_trivial_count) result.optimize_trivial_count = settings.optimize_trivial_count_query && diff --git a/src/Interpreters/SyntaxAnalyzer.h b/src/Interpreters/SyntaxAnalyzer.h index dd3c49a0f1a..2c86ce11c42 100644 --- a/src/Interpreters/SyntaxAnalyzer.h +++ b/src/Interpreters/SyntaxAnalyzer.h @@ -11,6 +11,7 @@ namespace DB { class ASTFunction; +struct ASTTablesInSelectQueryElement; class TableJoin; class Context; struct Settings; @@ -24,6 +25,7 @@ struct SyntaxAnalyzerResult ConstStoragePtr storage; StorageMetadataPtr metadata_snapshot; std::shared_ptr analyzed_join; + const ASTTablesInSelectQueryElement * ast_join = nullptr; NamesAndTypesList source_columns; NameSet source_columns_set; /// Set of names of source_columns. diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 75c07554318..5ea56240496 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -28,13 +28,12 @@ static void filterDistinctColumns(const Block & res_header, NameSet & distinct_c distinct_columns.swap(new_distinct_columns); } -ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_) +ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_) : ITransformingStep( input_stream_, - ExpressionTransform::transformHeader(input_stream_.header, expression_), + Transform::transformHeader(input_stream_.header, expression_), getTraits(expression_)) , expression(std::move(expression_)) - , default_totals(default_totals_) { /// Some columns may be removed by expression. /// TODO: also check aliases, functions and some types of join @@ -44,28 +43,19 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio void ExpressionStep::transformPipeline(QueryPipeline & pipeline) { - /// In case joined subquery has totals, and we don't, add default chunk to totals. - bool add_default_totals = false; - if (default_totals && !pipeline.hasTotals()) - { - pipeline.addDefaultTotals(); - add_default_totals = true; - } - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) { bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared(header, expression, on_totals, add_default_totals); + return std::make_shared(header, expression, on_totals); }); } -InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_) +InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_) : ITransformingStep( input_stream_, - ExpressionTransform::transformHeader(input_stream_.header, expression_), + Transform::transformHeader(input_stream_.header, expression_), getTraits(expression_)) , expression(std::move(expression_)) - , default_totals(default_totals_) { filterDistinctColumns(output_stream->header, output_stream->distinct_columns); filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); @@ -75,7 +65,7 @@ void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline) { /// In case joined subquery has totals, and we don't, add default chunk to totals. bool add_default_totals = false; - if (default_totals && !pipeline.hasTotals()) + if (!pipeline.hasTotals()) { pipeline.addDefaultTotals(); add_default_totals = true; @@ -84,7 +74,7 @@ void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline) pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) { bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared(header, expression, on_totals, add_default_totals); + return std::make_shared(header, expression, on_totals, add_default_totals); }); } diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 4f268944c95..7bb7e4dbd0b 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -7,31 +7,36 @@ namespace DB class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +class ExpressionTransform; +class InflatingExpressionTransform; + class ExpressionStep : public ITransformingStep { public: - explicit ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false); + using Transform = ExpressionTransform; + + explicit ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_); String getName() const override { return "Expression"; } void transformPipeline(QueryPipeline & pipeline) override; private: ExpressionActionsPtr expression; - bool default_totals; /// See ExpressionTransform }; /// TODO: add separate step for join. class InflatingExpressionStep : public ITransformingStep { public: - explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false); + using Transform = InflatingExpressionTransform; + + explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_); String getName() const override { return "Expression"; } void transformPipeline(QueryPipeline & pipeline) override; private: ExpressionActionsPtr expression; - bool default_totals; /// See ExpressionTransform }; } diff --git a/src/Processors/Transforms/ExpressionTransform.cpp b/src/Processors/Transforms/ExpressionTransform.cpp index bf523d6d7a3..9f7970d3272 100644 --- a/src/Processors/Transforms/ExpressionTransform.cpp +++ b/src/Processors/Transforms/ExpressionTransform.cpp @@ -12,11 +12,10 @@ Block ExpressionTransform::transformHeader(Block header, const ExpressionActions } -ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_, bool on_totals_, bool default_totals_) +ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_, bool on_totals_) : ISimpleTransform(header_, transformHeader(header_, expression_), on_totals_) , expression(std::move(expression_)) , on_totals(on_totals_) - , default_totals(default_totals_) { } @@ -37,14 +36,7 @@ void ExpressionTransform::transform(Chunk & chunk) auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); if (on_totals) - { - /// Drop totals if both out stream and joined stream doesn't have ones. - /// See comment in ExpressionTransform.h - if (default_totals && !expression->hasTotalsInJoin()) - return; - expression->executeOnTotals(block); - } else expression->execute(block); diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index 60d6dc0f777..c7de2de39ab 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -13,8 +13,7 @@ public: ExpressionTransform( const Block & header_, ExpressionActionsPtr expression_, - bool on_totals_ = false, - bool default_totals_ = false); + bool on_totals_ = false); String getName() const override { return "ExpressionTransform"; } @@ -26,10 +25,6 @@ protected: private: ExpressionActionsPtr expression; bool on_totals; - /// This flag means that we have manually added totals to our pipeline. - /// It may happen in case if joined subquery has totals, but out string doesn't. - /// We need to join default values with subquery totals if we have them, or return empty chunk is haven't. - bool default_totals; bool initialized = false; }; diff --git a/src/Processors/Transforms/InflatingExpressionTransform.cpp b/src/Processors/Transforms/InflatingExpressionTransform.cpp index de4e93ef8d2..7e7a029eed9 100644 --- a/src/Processors/Transforms/InflatingExpressionTransform.cpp +++ b/src/Processors/Transforms/InflatingExpressionTransform.cpp @@ -5,9 +5,10 @@ namespace DB { -static Block transformHeader(Block header, const ExpressionActionsPtr & expression) +Block InflatingExpressionTransform::transformHeader(Block header, const ExpressionActionsPtr & expression) { - expression->execute(header, true); + ExtraBlockPtr tmp; + expression->execute(header, tmp); return header; } @@ -38,8 +39,12 @@ void InflatingExpressionTransform::transform(Chunk & chunk) { /// We have to make chunk empty before return block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + + /// Drop totals if both out stream and joined stream doesn't have ones. + /// See comment in ExpressionTransform.h if (default_totals && !expression->hasTotalsInJoin()) return; + expression->executeOnTotals(block); } else @@ -59,7 +64,7 @@ Block InflatingExpressionTransform::readExecute(Chunk & chunk) res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); if (res) - expression->execute(res, not_processed, action_number); + expression->execute(res, not_processed); } else if (not_processed->empty()) /// There's not processed data inside expression. { @@ -67,12 +72,12 @@ Block InflatingExpressionTransform::readExecute(Chunk & chunk) res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); not_processed.reset(); - expression->execute(res, not_processed, action_number); + expression->execute(res, not_processed); } else { res = std::move(not_processed->block); - expression->execute(res, not_processed, action_number); + expression->execute(res, not_processed); } return res; } diff --git a/src/Processors/Transforms/InflatingExpressionTransform.h b/src/Processors/Transforms/InflatingExpressionTransform.h index b490d0699ad..aa638d27c9f 100644 --- a/src/Processors/Transforms/InflatingExpressionTransform.h +++ b/src/Processors/Transforms/InflatingExpressionTransform.h @@ -16,6 +16,8 @@ public: String getName() const override { return "InflatingExpressionTransform"; } + static Block transformHeader(Block header, const ExpressionActionsPtr & expression); + protected: void transform(Chunk & chunk) override; bool needInputData() const override { return !not_processed; } @@ -23,11 +25,13 @@ protected: private: ExpressionActionsPtr expression; bool on_totals; + /// This flag means that we have manually added totals to our pipeline. + /// It may happen in case if joined subquery has totals, but out string doesn't. + /// We need to join default values with subquery totals if we have them, or return empty chunk is haven't. bool default_totals; bool initialized = false; ExtraBlockPtr not_processed; - size_t action_number = 0; Block readExecute(Chunk & chunk); };