From 0e2f52518fb54798486dd8fe05feae1c3b8d9b62 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 25 Mar 2021 12:57:14 +0300 Subject: [PATCH] Extract converting from UnionStep. --- .../ClusterProxy/executeQuery.cpp | 3 +- src/Interpreters/InterpreterInsertQuery.cpp | 2 +- .../InterpreterSelectWithUnionQuery.cpp | 15 ++++++++- src/Processors/QueryPipeline.cpp | 33 +++++++++++-------- src/Processors/QueryPipeline.h | 2 -- src/Processors/QueryPlan/CreatingSetsStep.cpp | 4 +-- src/Processors/QueryPlan/CreatingSetsStep.h | 2 +- src/Processors/QueryPlan/UnionStep.cpp | 20 ++++++++--- src/Processors/QueryPlan/UnionStep.h | 4 +-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 3 +- src/Storages/StorageBuffer.cpp | 2 +- 11 files changed, 58 insertions(+), 32 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 59cbae67770..d319f7f71bb 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -156,8 +156,7 @@ void executeQuery( for (auto & plan : plans) input_streams.emplace_back(plan->getCurrentDataStream()); - auto header = input_streams.front().header; - auto union_step = std::make_unique(std::move(input_streams), header); + auto union_step = std::make_unique(std::move(input_streams)); query_plan.unitePlans(std::move(union_step), std::move(plans)); } diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index b4ed7a2fd6e..ac6b63be480 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -250,7 +250,7 @@ BlockIO InterpreterInsertQuery::execute() } } - res.pipeline = QueryPipeline::unitePipelines(std::move(pipelines), {}, ExpressionActionsSettings::fromContext(context)); + res.pipeline = QueryPipeline::unitePipelines(std::move(pipelines)); } } diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 21737bbb57e..48610731ef8 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -251,11 +252,23 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan) { plans[i] = std::make_unique(); nested_interpreters[i]->buildQueryPlan(*plans[i]); + + if (!blocksHaveEqualStructure(plans[i]->getCurrentDataStream().header, result_header)) + { + auto actions_dag = ActionsDAG::makeConvertingActions( + plans[i]->getCurrentDataStream().header.getColumnsWithTypeAndName(), + result_header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position); + auto converting_step = std::make_unique(plans[i]->getCurrentDataStream(), std::move(actions_dag)); + converting_step->setStepDescription("Conversion before UNION"); + plans[i]->addStep(std::move(converting_step)); + } + data_streams[i] = plans[i]->getCurrentDataStream(); } auto max_threads = context->getSettingsRef().max_threads; - auto union_step = std::make_unique(std::move(data_streams), result_header, max_threads); + auto union_step = std::make_unique(std::move(data_streams), max_threads); query_plan.unitePlans(std::move(union_step), std::move(plans)); diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index cb897575744..5201950ef47 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -211,11 +211,14 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output) QueryPipeline QueryPipeline::unitePipelines( std::vector> pipelines, - const Block & common_header, - const ExpressionActionsSettings & settings, size_t max_threads_limit, Processors * collected_processors) { + if (pipelines.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of pipelines"); + + Block common_header = pipelines.front()->getHeader(); + /// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0. /// If true, result max_threads will be sum(max_threads). /// Note: it may be > than settings.max_threads, so we should apply this limit again. @@ -229,19 +232,21 @@ QueryPipeline QueryPipeline::unitePipelines( pipeline.checkInitialized(); pipeline.pipe.collected_processors = collected_processors; - if (!pipeline.isCompleted()) - { - auto actions_dag = ActionsDAG::makeConvertingActions( - pipeline.getHeader().getColumnsWithTypeAndName(), - common_header.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Position); - auto actions = std::make_shared(actions_dag, settings); + assertBlocksHaveEqualStructure(pipeline.getHeader(), common_header, "QueryPipeline::unitePipelines"); - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, actions); - }); - } + // if (!pipeline.isCompleted()) + // { + // auto actions_dag = ActionsDAG::makeConvertingActions( + // pipeline.getHeader().getColumnsWithTypeAndName(), + // common_header.getColumnsWithTypeAndName(), + // ActionsDAG::MatchColumnsMode::Position); + // auto actions = std::make_shared(actions_dag, settings); + + // pipeline.addSimpleTransform([&](const Block & header) + // { + // return std::make_shared(header, actions); + // }); + // } pipes.emplace_back(std::move(pipeline.pipe)); diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index a26aaea88cc..1c1243e288f 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -90,8 +90,6 @@ public: /// If collector is used, it will collect only newly-added processors, but not processors from pipelines. static QueryPipeline unitePipelines( std::vector> pipelines, - const Block & common_header, - const ExpressionActionsSettings & settings, size_t max_threads_limit = 0, Processors * collected_processors = nullptr); diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index f3cc7c6b1a8..ec710d493ed 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -74,7 +74,7 @@ CreatingSetsStep::CreatingSetsStep(DataStreams input_streams_) input_streams[i].header.dumpStructure()); } -QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) +QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) { if (pipelines.empty()) throw Exception("CreatingSetsStep cannot be created with no inputs", ErrorCodes::LOGICAL_ERROR); @@ -89,7 +89,7 @@ QueryPipelinePtr CreatingSetsStep::updatePipeline(QueryPipelines pipelines, cons if (pipelines.size() > 1) { QueryPipelineProcessorsCollector collector(delayed_pipeline, this); - delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines), Block(), settings.getActionsSettings()); + delayed_pipeline = QueryPipeline::unitePipelines(std::move(pipelines)); processors = collector.detachProcessors(); } else diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h index 9c6e53e0195..14006a1a384 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.h +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -37,7 +37,7 @@ public: String getName() const override { return "CreatingSets"; } - QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override; + QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override; void describePipeline(FormatSettings & settings) const override; diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index 66fb3ba8593..a697517b5e8 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -6,8 +6,20 @@ namespace DB { -UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_) - : header(std::move(result_header)) +static Block checkHeaders(const DataStreams & input_streams) +{ + if (input_streams.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of query plan steps"); + + Block res = input_streams.front().header; + for (const auto & stream : input_streams) + assertBlocksHaveEqualStructure(stream.header, res, "UnionStep"); + + return res; +} + +UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_) + : header(checkHeaders(input_streams_)) , max_threads(max_threads_) { input_streams = std::move(input_streams_); @@ -18,7 +30,7 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max output_stream = DataStream{.header = header}; } -QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) +QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) { auto pipeline = std::make_unique(); QueryPipelineProcessorsCollector collector(*pipeline, this); @@ -30,7 +42,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines, const Build return pipeline; } - *pipeline = QueryPipeline::unitePipelines(std::move(pipelines), output_stream->header, settings.getActionsSettings(), max_threads); + *pipeline = QueryPipeline::unitePipelines(std::move(pipelines), max_threads); processors = collector.detachProcessors(); return pipeline; diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index 2d997e0a36d..738ada4a565 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -9,11 +9,11 @@ class UnionStep : public IQueryPlanStep { public: /// max_threads is used to limit the number of threads for result pipeline. - UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_ = 0); + explicit UnionStep(DataStreams input_streams_, size_t max_threads_ = 0); String getName() const override { return "Union"; } - QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings & settings) override; + QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override; void describePipeline(FormatSettings & settings) const override; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 747819c77eb..b9b6d574d97 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1355,8 +1355,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( for (const auto & plan : plans) input_streams.emplace_back(plan->getCurrentDataStream()); - const auto & common_header = plans.front()->getCurrentDataStream().header; - auto union_step = std::make_unique(std::move(input_streams), common_header); + auto union_step = std::make_unique(std::move(input_streams)); auto plan = std::make_unique(); plan->unitePlans(std::move(union_step), std::move(plans)); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 6dc32f4c880..46641300de4 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -391,7 +391,7 @@ void StorageBuffer::read( plans.emplace_back(std::make_unique(std::move(buffers_plan))); query_plan = QueryPlan(); - auto union_step = std::make_unique(std::move(input_streams), result_header); + auto union_step = std::make_unique(std::move(input_streams)); union_step->setStepDescription("Unite sources from Buffer table"); query_plan.unitePlans(std::move(union_step), std::move(plans)); }