From 12449caf8917afd074d0f431f72b7cc0bcbf95ed Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Thu, 31 Mar 2022 16:08:57 +0800 Subject: [PATCH] Refactoring QueryPipeline --- src/QueryPipeline/Pipe.cpp | 38 -------- src/QueryPipeline/Pipe.h | 1 - src/QueryPipeline/QueryPipelineBuilder.cpp | 103 --------------------- src/QueryPipeline/QueryPipelineBuilder.h | 9 -- 4 files changed, 151 deletions(-) diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 6cef7cc28bd..551841524b3 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -759,44 +759,6 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) header.clear(); } -void Pipe::setOutputFormat(ProcessorPtr output) -{ - if (output_ports.empty()) - throw Exception("Cannot set output format to empty Pipe.", ErrorCodes::LOGICAL_ERROR); - - if (output_ports.size() != 1) - throw Exception("Cannot set output format to Pipe because single output port is expected, " - "but it has " + std::to_string(output_ports.size()) + " ports", ErrorCodes::LOGICAL_ERROR); - - auto * format = dynamic_cast(output.get()); - - if (!format) - throw Exception("IOutputFormat processor expected for QueryPipelineBuilder::setOutputFormat.", - ErrorCodes::LOGICAL_ERROR); - - auto & main = format->getPort(IOutputFormat::PortKind::Main); - auto & totals = format->getPort(IOutputFormat::PortKind::Totals); - auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes); - - if (!totals_port) - addTotalsSource(std::make_shared(totals.getHeader())); - - if (!extremes_port) - addExtremesSource(std::make_shared(extremes.getHeader())); - - if (collected_processors) - collected_processors->emplace_back(output); - - processors.emplace_back(std::move(output)); - - connect(*output_ports.front(), main); - connect(*totals_port, totals); - connect(*extremes_port, extremes); - - output_ports.clear(); - header.clear(); -} - void Pipe::transform(const Transformer & transformer) { if (output_ports.empty()) diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 613e92a782d..bc19b8389b3 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -141,7 +141,6 @@ private: bool isCompleted() const { return !empty() && output_ports.empty(); } static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header); void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); - void setOutputFormat(ProcessorPtr output); friend class QueryPipelineBuilder; friend class QueryPipeline; diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index dba7c7cb8f7..fcd3105a422 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -247,21 +246,6 @@ void QueryPipelineBuilder::addExtremesTransform() pipe.addTransform(std::move(transform), nullptr, port); } -void QueryPipelineBuilder::setOutputFormat(ProcessorPtr output) -{ - checkInitializedAndNotCompleted(); - - if (output_format) - throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR); - - resize(1); - - output_format = dynamic_cast(output.get()); - pipe.setOutputFormat(std::move(output)); - - initRowsBeforeLimit(); -} - QueryPipelineBuilder QueryPipelineBuilder::unitePipelines( std::vector> pipelines, size_t max_threads_limit, @@ -461,93 +445,6 @@ void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem) } } -void QueryPipelineBuilder::initRowsBeforeLimit() -{ - RowsBeforeLimitCounterPtr rows_before_limit_at_least; - - /// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor. - std::vector limits; - std::vector remote_sources; - - std::unordered_set visited; - - struct QueuedEntry - { - IProcessor * processor; - bool visited_limit; - }; - - std::queue queue; - - queue.push({ output_format, false }); - visited.emplace(output_format); - - while (!queue.empty()) - { - auto * processor = queue.front().processor; - auto visited_limit = queue.front().visited_limit; - queue.pop(); - - if (!visited_limit) - { - if (auto * limit = typeid_cast(processor)) - { - visited_limit = true; - limits.emplace_back(limit); - } - - if (auto * source = typeid_cast(processor)) - remote_sources.emplace_back(source); - } - else if (auto * sorting = typeid_cast(processor)) - { - if (!rows_before_limit_at_least) - rows_before_limit_at_least = std::make_shared(); - - sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least); - - /// Don't go to children. Take rows_before_limit from last PartialSortingTransform. - continue; - } - - /// Skip totals and extremes port for output format. - if (auto * format = dynamic_cast(processor)) - { - auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor(); - if (visited.emplace(child_processor).second) - queue.push({ child_processor, visited_limit }); - - continue; - } - - for (auto & child_port : processor->getInputs()) - { - auto * child_processor = &child_port.getOutputPort().getProcessor(); - if (visited.emplace(child_processor).second) - queue.push({ child_processor, visited_limit }); - } - } - - if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty())) - { - rows_before_limit_at_least = std::make_shared(); - - for (auto & limit : limits) - limit->setRowsBeforeLimitCounter(rows_before_limit_at_least); - - for (auto & source : remote_sources) - source->setRowsBeforeLimitCounter(rows_before_limit_at_least); - } - - /// If there is a limit, then enable rows_before_limit_at_least - /// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result. - if (!limits.empty()) - rows_before_limit_at_least->add(0); - - if (rows_before_limit_at_least) - output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least); -} - PipelineExecutorPtr QueryPipelineBuilder::execute() { if (!isCompleted()) diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 9e198f45e98..ac84191cf34 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -10,8 +10,6 @@ namespace DB { -class IOutputFormat; - class QueryPipelineProcessorsCollector; struct AggregatingTransformParams; @@ -71,10 +69,6 @@ public: void addTotalsHavingTransform(ProcessorPtr transform); /// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number. void addExtremesTransform(); - /// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation. - void setOutputFormat(ProcessorPtr output); - /// Get current OutputFormat. - IOutputFormat * getOutputFormat() const { return output_format; } /// Sink is a processor with single input port and no output ports. Creates sink for each output port. /// Pipeline will be completed after this transformation. void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); @@ -163,7 +157,6 @@ public: private: Pipe pipe; - IOutputFormat * output_format = nullptr; /// Limit on the number of threads. Zero means no limit. /// Sometimes, more streams are created then the number of threads for more optimal execution. @@ -174,8 +167,6 @@ private: void checkInitialized(); void checkInitializedAndNotCompleted(); - void initRowsBeforeLimit(); - void setCollectedProcessors(Processors * processors); friend class QueryPipelineProcessorsCollector;