diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 9d6d5f783ff..be040ff2c34 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -900,10 +900,11 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su * in the subquery_for_set object, this subquery is set as source and the temporary table _data1 as the table. * - this function shows the expression IN_data1. */ - if (subquery_for_set.source.empty() && data.no_storage_or_local) + if (!subquery_for_set.source && data.no_storage_or_local) { auto interpreter = interpretSubquery(right_in_operand, data.context, data.subquery_depth, {}); - subquery_for_set.source = QueryPipeline::getPipe(interpreter->execute().pipeline); + subquery_for_set.source = std::make_unique(); + interpreter->buildQueryPlan(*subquery_for_set.source); } subquery_for_set.set = set; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index d9fc44d9b8f..14a50c2cfc6 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -582,7 +582,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(context, analyzedJoin()); Names original_right_columns; - if (subquery_for_join.source.empty()) + if (!subquery_for_join.source) { NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns( joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns()); diff --git a/src/Interpreters/GlobalSubqueriesVisitor.h b/src/Interpreters/GlobalSubqueriesVisitor.h index e155a132241..719794f0607 100644 --- a/src/Interpreters/GlobalSubqueriesVisitor.h +++ b/src/Interpreters/GlobalSubqueriesVisitor.h @@ -135,7 +135,8 @@ public: ast = database_and_table_name; external_tables[external_table_name] = external_storage_holder; - subqueries_for_sets[external_table_name].source = QueryPipeline::getPipe(interpreter->execute().pipeline); + subqueries_for_sets[external_table_name].source = std::make_unique(); + interpreter->buildQueryPlan(*subqueries_for_sets[external_table_name].source); subqueries_for_sets[external_table_name].table = external_storage; /** NOTE If it was written IN tmp_table - the existing temporary (but not external) table, diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 079fc792447..6f1be43b3a1 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1862,14 +1862,38 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_p const Settings & settings = context->getSettingsRef(); - auto creating_sets = std::make_unique( - query_plan.getCurrentDataStream(), - std::move(subqueries_for_sets), - SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), - *context); + if (subqueries_for_sets.empty()) + return; - creating_sets->setStepDescription("Create sets for subqueries and joins"); - query_plan.addStep(std::move(creating_sets)); + SizeLimits limits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode); + + std::vector plans; + DataStreams input_streams; + input_streams.emplace_back(query_plan.getCurrentDataStream()); + + for (auto & [description, set] : subqueries_for_sets) + { + auto plan = std::move(set.source); + std::string type = (set.join != nullptr) ? "JOIN" + : "subquery"; + + auto creating_set = std::make_unique( + plan->getCurrentDataStream(), + query_plan.getCurrentDataStream().header, + std::move(description), + std::move(set), + limits, + *context); + creating_set->setStepDescription("Create set for " + type); + plan->addStep(std::move(creating_set)); + + input_streams.emplace_back(plan->getCurrentDataStream()); + plans.emplace_back(std::move(*plan)); + } + + auto creating_sets = std::make_unique(std::move(input_streams)); + creating_sets->setStepDescription("Create sets before main query execution"); + query_plan.unitePlans(std::move(creating_sets), std::move(plans)); } diff --git a/src/Interpreters/SubqueryForSet.cpp b/src/Interpreters/SubqueryForSet.cpp index 038ecbbb0b6..e944b76e71c 100644 --- a/src/Interpreters/SubqueryForSet.cpp +++ b/src/Interpreters/SubqueryForSet.cpp @@ -12,9 +12,10 @@ void SubqueryForSet::makeSource(std::shared_ptr NamesWithAliases && joined_block_aliases_) { joined_block_aliases = std::move(joined_block_aliases_); - source = QueryPipeline::getPipe(interpreter->execute().pipeline); + source = std::make_unique(); + interpreter->buildQueryPlan(*source); - sample_block = source.getHeader(); + sample_block = interpreter->getSampleBlock(); renameColumns(sample_block); } diff --git a/src/Interpreters/SubqueryForSet.h b/src/Interpreters/SubqueryForSet.h index d268758c3e8..b44f0f6cf8b 100644 --- a/src/Interpreters/SubqueryForSet.h +++ b/src/Interpreters/SubqueryForSet.h @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB @@ -14,12 +13,13 @@ namespace DB class InterpreterSelectWithUnionQuery; class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +class QueryPlan; /// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section. struct SubqueryForSet { /// The source is obtained using the InterpreterSelectQuery subquery. - Pipe source; + std::unique_ptr source; /// If set, build it from result. SetPtr set; diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 0b654d0f325..c7ffe8b0c9b 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -204,7 +204,7 @@ void QueryPipeline::addCreatingSetsTransform(SubqueriesForSets subqueries_for_se for (auto & subquery : subqueries_for_sets) { - if (!subquery.second.source.empty()) + if (subquery.second.source) { auto & source = sources.emplace_back(std::move(subquery.second.source)); if (source.numOutputPorts() > 1) @@ -315,6 +315,20 @@ QueryPipeline QueryPipeline::unitePipelines( return pipeline; } +void QueryPipeline::addDelayedPipeline(QueryPipeline pipeline) +{ + pipeline.resize(1); + + auto * collected_processors = pipe.collected_processors; + + Pipes pipes; + pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline))); + pipes.emplace_back(std::move(pipe)); + pipe = Pipe::unitePipes(std::move(pipes), collected_processors); + + pipe.addTransform(std::make_shared(getHeader(), 2)); +} + void QueryPipeline::setProgressCallback(const ProgressCallback & callback) { for (auto & processor : pipe.processors) diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 45b410ab323..06c67c897be 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -87,6 +87,8 @@ public: size_t max_threads_limit = 0, Processors * collected_processors = nullptr); + void addDelayedPipeline(QueryPipeline); + PipelineExecutorPtr execute(); size_t getNumStreams() const { return pipe.numOutputPorts(); } diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index 7e840e1531b..9a4c11f9222 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -22,37 +22,91 @@ static ITransformingStep::Traits getTraits() }; } -CreatingSetsStep::CreatingSetsStep( +CreatingSetStep::CreatingSetStep( const DataStream & input_stream_, - SubqueriesForSets subqueries_for_sets_, + Block header, + String description_, + SubqueryForSet subquery_for_set_, SizeLimits network_transfer_limits_, const Context & context_) - : ITransformingStep(input_stream_, input_stream_.header, getTraits()) - , subqueries_for_sets(std::move(subqueries_for_sets_)) + : ITransformingStep(input_stream_, header, getTraits()) + , description(std::move(description_)) + , subquery_for_set(std::move(subquery_for_set_)) , network_transfer_limits(std::move(network_transfer_limits_)) , context(context_) { } -void CreatingSetsStep::transformPipeline(QueryPipeline & pipeline) +void CreatingSetStep::transformPipeline(QueryPipeline & pipeline) { - pipeline.addCreatingSetsTransform(std::move(subqueries_for_sets), network_transfer_limits, context); + pipeline.resize(1); + pipeline.addTransform( + std::make_shared( + pipeline.getHeader(), + getOutputStream().header, + std::move(subquery_for_set), + network_transfer_limits, + context)); } -void CreatingSetsStep::describeActions(FormatSettings & settings) const +void CreatingSetStep::describeActions(FormatSettings & settings) const { String prefix(settings.offset, ' '); - for (const auto & set : subqueries_for_sets) + settings.out << prefix; + if (subquery_for_set.set) + settings.out << "Set: "; + else if (subquery_for_set.join) + settings.out << "Join: "; + + settings.out << description << '\n'; +} + +CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_) +{ + if (input_streams_.empty()) + throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR); + + input_streams = std::move(input_streams_); + output_stream = input_streams.front(); + + for (size_t i = 1; i < input_streams.size(); ++i) + assertBlocksHaveEqualStructure(output_stream->header, input_streams[i].header, "CreatingSets"); +} + +QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines) +{ + if (pipelines.empty()) + throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR); + + auto main_pipeline = std::move(pipelines.front()); + if (pipelines.size() == 1) + return main_pipeline; + + std::swap(pipelines.front(), pipelines.back()); + pipelines.pop_back(); + + QueryPipeline delayed_pipeline; + if (pipelines.size() > 1) { - settings.out << prefix; - if (set.second.set) - settings.out << "Set: "; - else if (set.second.join) - settings.out << "Join: "; - - settings.out << set.first << '\n'; + QueryPipelineProcessorsCollector collector(delayed_pipeline, this); + delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header); + processors = collector.detachProcessors(); } + else + delayed_pipeline = std::move(*pipelines.front()); + + QueryPipelineProcessorsCollector collector(*main_pipeline, this); + main_pipeline->addDelayedPipeline(std::move(delayed_pipeline)); + auto added_processors = collector.detachProcessors(); + processors.insert(processors.end(), added_processors.begin(), added_processors.end()); + + return main_pipeline; +} + +void CreatingSetsStep::describePipeline(FormatSettings & settings) const +{ + IQueryPlanStep::describePipeline(processors, settings); } } diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h index 4ba4863c043..f2351c91518 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.h +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -7,12 +7,14 @@ namespace DB { /// Creates sets for subqueries and JOIN. See CreatingSetsTransform. -class CreatingSetsStep : public ITransformingStep +class CreatingSetStep : public ITransformingStep { public: - CreatingSetsStep( + CreatingSetStep( const DataStream & input_stream_, - SubqueriesForSets subqueries_for_sets_, + Block header, + String description_, + SubqueryForSet subquery_for_set_, SizeLimits network_transfer_limits_, const Context & context_); @@ -23,9 +25,25 @@ public: void describeActions(FormatSettings & settings) const override; private: - SubqueriesForSets subqueries_for_sets; + String description; + SubqueryForSet subquery_for_set; SizeLimits network_transfer_limits; const Context & context; }; +class CreatingSetsStep : public IQueryPlanStep +{ +public: + CreatingSetsStep(DataStreams input_streams_); + + String getName() const override { return "CreatingSets"; } + + QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override; + + void describePipeline(FormatSettings & settings) const override; + +private: + Processors processors; +}; + }