From a153f05e10fe9ddfb7607672385c8ce6a6a826dc Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 4 Aug 2020 18:51:56 +0300 Subject: [PATCH] Refactor Pipe [part 5]. --- src/Interpreters/InterpreterAlterQuery.cpp | 4 +- src/Processors/Pipe.cpp | 119 ++++++++- src/Processors/Pipe.h | 7 +- src/Processors/QueryPipeline.cpp | 269 ++++----------------- src/Processors/QueryPipeline.h | 13 +- 5 files changed, 164 insertions(+), 248 deletions(-) diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 2f9e93d0eee..6f1215d6af0 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -87,8 +87,8 @@ BlockIO InterpreterAlterQuery::execute() if (!partition_commands.empty()) { table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, context.getSettingsRef()); - auto partition_commands_pipes = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context); - if (!partition_commands_pipes.empty()) + auto partition_commands_pipe = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context); + if (!partition_commands_pipe.empty()) res.pipeline.init(std::move(partition_commands_pipes)); } diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index 65c2e5f2a5a..7a0c7184882 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include namespace DB { @@ -374,7 +376,37 @@ void Pipe::addExtremesSource(ProcessorPtr source) processors.emplace_back(std::move(source)); } +static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors) +{ + if (port == nullptr) + return; + + auto null_sink = std::make_shared(port->getHeader()); + connect(*port, null_sink->getPort()); + + if (collected_processors) + collected_processors->emplace_back(null_sink.get()); + + processors.emplace_back(std::move(null_sink)); + port = nullptr; +} + +void Pipe::dropTotals() +{ + dropPort(totals_port, processors, collected_processors); +} + +void Pipe::dropExtremes() +{ + dropPort(extremes_port, processors, collected_processors); +} + void Pipe::addTransform(ProcessorPtr transform) +{ + addTransform(std::move(transform), nullptr, nullptr); +} + +void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes) { if (output_ports.empty()) throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR); @@ -385,6 +417,19 @@ void Pipe::addTransform(ProcessorPtr transform) "Processor has " + std::to_string(inputs.size()) + " input ports, " "but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR); + if (totals && totals_port) + throw Exception("Cannot add transform with totals to Pipe because it already has totals.", + ErrorCodes::LOGICAL_ERROR); + + if (extremes && extremes_port) + throw Exception("Cannot add transform with totals to Pipe because it already has totals.", + ErrorCodes::LOGICAL_ERROR); + + if (totals) + totals_port = totals; + if (extremes) + extremes_port = extremes; + size_t next_output = 0; for (auto & input : inputs) { @@ -393,15 +438,34 @@ void Pipe::addTransform(ProcessorPtr transform) } auto & outputs = transform->getOutputs(); - if (outputs.empty()) - throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs", - ErrorCodes::LOGICAL_ERROR); output_ports.clear(); output_ports.reserve(outputs.size()); + bool found_totals = false; + bool found_extremes = false; + for (auto & output : outputs) - output_ports.emplace_back(&output); + { + if (&output == totals) + found_totals = true; + else if (&output == extremes) + found_extremes = true; + else + output_ports.emplace_back(&output); + } + + if (totals && !found_totals) + throw Exception("Cannot add transform " + transform->getName() + " to Pipes because " + "specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR); + + if (extremes && !found_extremes) + throw Exception("Cannot add transform " + transform->getName() + " to Pipes because " + "specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR); + + if (output_ports.empty()) + throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs", + ErrorCodes::LOGICAL_ERROR); header = output_ports.front()->getHeader(); for (size_t i = 1; i < output_ports.size(); ++i) @@ -524,6 +588,44 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) header.clear(); } +void Pipe::setOutputFormat(ProcessorPtr output) +{ + if (output_ports.empty()) + throw Exception("Cannot set output format to empty Pipe.", ErrorCodes::LOGICAL_ERROR); + + if (output_ports.size() != 1) + throw Exception("Cannot set output format to Pipe because single output port is expected, " + "but it has " + std::to_string(output_ports.size()) + " ports", ErrorCodes::LOGICAL_ERROR); + + auto * format = dynamic_cast(output.get()); + + if (!format) + throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", + ErrorCodes::LOGICAL_ERROR); + + auto & main = format->getPort(IOutputFormat::PortKind::Main); + auto & totals = format->getPort(IOutputFormat::PortKind::Totals); + auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes); + + if (!totals_port) + addTotalsSource(std::make_shared(totals.getHeader())); + + if (!extremes_port) + addExtremesSource(std::make_shared(extremes.getHeader())); + + if (collected_processors) + collected_processors->emplace_back(output.get()); + + processors.emplace_back(std::move(output)); + + connect(*output_ports.front(), main); + connect(*totals_port, totals); + connect(*extremes_port, extremes); + + output_ports.clear(); + header.clear(); +} + void Pipe::transform(const Transformer & transformer) { if (output_ports.empty()) @@ -620,13 +722,4 @@ void Pipe::setQuota(const std::shared_ptr & quota) } } -void Pipe::enableQuota() -{ - for (auto & processor : processors) - { - if (auto * source = dynamic_cast(processor.get())) - source->enableQuota(); - } -} - } diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index e6ae375da48..787bade065a 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -52,10 +52,15 @@ public: void addTotalsSource(ProcessorPtr source); void addExtremesSource(ProcessorPtr source); + /// Drop totals and extremes (create NullSink for them). + void dropTotals(); + void dropExtremes(); + /// Add processor to list. It should have size() input ports with compatible header. /// Output ports should have same headers. /// If totals or extremes are not empty, transform shouldn't change header. void addTransform(ProcessorPtr transform); + void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); enum class StreamType { @@ -85,7 +90,6 @@ public: /// Specify quotas and limits for every ISourceWithProgress. void setLimits(const SourceWithProgress::LocalLimits & limits); void setQuota(const std::shared_ptr & quota); - void enableQuota(); /// Do not allow to change the table while the processors of pipe are alive. void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } @@ -125,6 +129,7 @@ private: bool isCompleted() const { return !empty() && output_ports.empty(); } static Pipe unitePipes(Pipes pipes, Processors * collected_processors); void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); + void setOutputFormat(ProcessorPtr output); friend class QueryPipeline; }; diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 778d5ab093d..97e96387f7f 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -2,9 +2,7 @@ #include #include -#include #include -#include #include #include #include @@ -148,8 +146,7 @@ void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform) resize(1); auto * totals_port = &transform->getOutputs().back(); - pipe.addTransform(std::move(transform)); - pipe.totals_port = totals_port; + pipe.addTransform(std::move(transform), totals_port, nullptr); } void QueryPipeline::addDefaultTotals() @@ -176,46 +173,21 @@ void QueryPipeline::addDefaultTotals() void QueryPipeline::dropTotalsAndExtremes() { - auto drop_port = [&](OutputPort *& port) - { - auto null_sink = std::make_shared(port->getHeader()); - connect(*port, null_sink->getPort()); - processors.emplace(std::move(null_sink)); - port = nullptr; - }; - - if (totals_having_port) - drop_port(totals_having_port); - - if (extremes_port) - drop_port(extremes_port); + pipe.dropTotals(); + pipe.dropExtremes(); } void QueryPipeline::addExtremesTransform() { checkInitializedAndNotCompleted(); - if (extremes_port) + if (pipe.getExtremesPort()) throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR); - std::vector extremes; - extremes.reserve(streams.size()); - - for (auto & stream : streams) - { - auto transform = std::make_shared(current_header); - connect(*stream, transform->getInputPort()); - - stream = &transform->getOutputPort(); - extremes.push_back(&transform->getExtremesPort()); - - processors.emplace(std::move(transform)); - } - - if (extremes.size() == 1) - extremes_port = extremes.front(); - else - extremes_port = uniteExtremes(extremes, current_header, processors); + resize(1); + auto transform = std::make_shared(getHeader()); + auto * port = &transform->getExtremesPort(); + pipe.addTransform(std::move(transform), nullptr, port); } void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform) @@ -228,94 +200,49 @@ void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform) resize(1); - auto concat = std::make_shared(current_header, 2); - connect(transform->getOutputs().front(), concat->getInputs().front()); - connect(*streams.back(), concat->getInputs().back()); - - streams.assign({ &concat->getOutputs().front() }); - processors.emplace(std::move(transform)); - processors.emplace(std::move(concat)); + /// Order is important for concat. Connect manually. + pipe.transform([&](OutputPortRawPtrs ports) -> Processors + { + auto concat = std::make_shared(getHeader(), 2); + connect(transform->getOutputs().front(), concat->getInputs().front()); + connect(*ports.back(), concat->getInputs().back()); + return { std::move(concat), std::move(transform) }; + }); } void QueryPipeline::setOutputFormat(ProcessorPtr output) { checkInitializedAndNotCompleted(); - auto * format = dynamic_cast(output.get()); - - if (!format) - throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", ErrorCodes::LOGICAL_ERROR); - if (output_format) throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR); - output_format = format; - resize(1); - auto & main = format->getPort(IOutputFormat::PortKind::Main); - auto & totals = format->getPort(IOutputFormat::PortKind::Totals); - auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes); - - if (!totals_having_port) - { - auto null_source = std::make_shared(totals.getHeader()); - totals_having_port = &null_source->getPort(); - processors.emplace(std::move(null_source)); - } - - if (!extremes_port) - { - auto null_source = std::make_shared(extremes.getHeader()); - extremes_port = &null_source->getPort(); - processors.emplace(std::move(null_source)); - } - - processors.emplace(std::move(output)); - - connect(*streams.front(), main); - connect(*totals_having_port, totals); - connect(*extremes_port, extremes); - - streams.clear(); - current_header.clear(); - extremes_port = nullptr; - totals_having_port = nullptr; + output_format = dynamic_cast(output.get()); + pipe.setOutputFormat(std::move(output)); initRowsBeforeLimit(); } -void QueryPipeline::unitePipelines( - std::vector> pipelines, const Block & common_header, size_t max_threads_limit) +QueryPipeline QueryPipeline::unitePipelines( + std::vector> pipelines, + const Block & common_header, + size_t max_threads_limit, + Processors * collected_processors) { /// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0. /// If true, result max_threads will be sum(max_threads). /// Note: it may be > than settings.max_threads, so we should apply this limit again. - bool will_limit_max_threads = !initialized() || max_threads != 0; - - if (initialized()) - { - addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, common_header, ConvertingTransform::MatchColumnsMode::Position); - }); - } - - std::vector extremes; - std::vector totals; - - if (extremes_port) - extremes.push_back(extremes_port); - - if (totals_having_port) - totals.push_back(totals_having_port); + bool will_limit_max_threads = true; + size_t max_threads = 0; + Pipes pipes; for (auto & pipeline_ptr : pipelines) { auto & pipeline = *pipeline_ptr; pipeline.checkInitialized(); - pipeline.processors.setCollectedProcessors(processors.getCollectedProcessors()); + pipeline.pipe.collected_processors = collected_processors; if (!pipeline.isCompleted()) { @@ -326,36 +253,7 @@ void QueryPipeline::unitePipelines( }); } - if (pipeline.extremes_port) - { - auto converting = std::make_shared( - pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position); - - connect(*pipeline.extremes_port, converting->getInputPort()); - extremes.push_back(&converting->getOutputPort()); - processors.emplace(std::move(converting)); - } - - /// Take totals only from first port. - if (pipeline.totals_having_port) - { - auto converting = std::make_shared( - pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position); - - connect(*pipeline.totals_having_port, converting->getInputPort()); - totals.push_back(&converting->getOutputPort()); - processors.emplace(std::move(converting)); - } - - auto * collector = processors.setCollectedProcessors(nullptr); - processors.emplace(pipeline.processors.detach()); - processors.setCollectedProcessors(collector); - - streams.addStreams(pipeline.streams); - - table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end())); - interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end()); - storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end()); + pipes.emplace_back(std::move(pipeline.pipe)); max_threads += pipeline.max_threads; will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0; @@ -366,33 +264,21 @@ void QueryPipeline::unitePipelines( max_threads_limit = pipeline.max_threads; } - if (!will_limit_max_threads) - max_threads = 0; - else - limitMaxThreads(max_threads_limit); + QueryPipeline pipeline; + pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors)); - if (!extremes.empty()) + if (will_limit_max_threads) { - if (extremes.size() == 1) - extremes_port = extremes.back(); - else - extremes_port = uniteExtremes(extremes, common_header, processors); + pipeline.setMaxThreads(max_threads); + pipeline.limitMaxThreads(max_threads_limit); } - if (!totals.empty()) - { - if (totals.size() == 1) - totals_having_port = totals.back(); - else - totals_having_port = uniteTotals(totals, common_header, processors); - } - - current_header = common_header; + return pipeline; } void QueryPipeline::setProgressCallback(const ProgressCallback & callback) { - for (auto & processor : processors.get()) + for (auto & processor : pipe.processors) { if (auto * source = dynamic_cast(processor.get())) source->setProgressCallback(callback); @@ -406,7 +292,7 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem) { process_list_element = elem; - for (auto & processor : processors.get()) + for (auto & processor : pipe.processors) { if (auto * source = dynamic_cast(processor.get())) source->setProcessListElement(elem); @@ -510,101 +396,28 @@ void QueryPipeline::initRowsBeforeLimit() output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least); } -Pipe QueryPipeline::getPipe() && -{ - Pipes pipes(processors.detach(), streams.at(0), totals_having_port, extremes_port); - pipe.max_parallel_streams = streams.maxParallelStreams(); - - for (auto & lock : table_locks) - pipe.addTableLock(lock); - - for (auto & context : interpreter_context) - pipe.addInterpreterContext(context); - - for (auto & storage : storage_holders) - pipe.addStorageHolder(storage); - - if (totals_having_port) - pipe.setTotalsPort(totals_having_port); - - if (extremes_port) - pipe.setExtremesPort(extremes_port); - - Pipes pipes; - pipes.emplace_back(std::move(pipe)); - - for (size_t i = 1; i < streams.size(); ++i) - pipes.emplace_back(Pipe(streams[i])); - - return pipes; -} - PipelineExecutorPtr QueryPipeline::execute() { if (!isCompleted()) throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR); - return std::make_shared(processors.get(), process_list_element); + return std::make_shared(pipe.processors, process_list_element); } -QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs) +void QueryPipeline::setCollectedProcessors(Processors * processors) { - /// Reset primitive fields - process_list_element = rhs.process_list_element; - rhs.process_list_element = nullptr; - max_threads = rhs.max_threads; - rhs.max_threads = 0; - output_format = rhs.output_format; - rhs.output_format = nullptr; - extremes_port = rhs.extremes_port; - rhs.extremes_port = nullptr; - totals_having_port = rhs.totals_having_port; - rhs.totals_having_port = nullptr; - - /// Move these fields in destruction order (it's important) - streams = std::move(rhs.streams); - processors = std::move(rhs.processors); - current_header = std::move(rhs.current_header); - table_locks = std::move(rhs.table_locks); - storage_holders = std::move(rhs.storage_holders); - interpreter_context = std::move(rhs.interpreter_context); - - return *this; -} - -void QueryPipeline::ProcessorsContainer::emplace(ProcessorPtr processor) -{ - if (collected_processors) - collected_processors->emplace_back(processor); - - processors.emplace_back(std::move(processor)); -} - -void QueryPipeline::ProcessorsContainer::emplace(Processors processors_) -{ - for (auto & processor : processors_) - emplace(std::move(processor)); -} - -Processors * QueryPipeline::ProcessorsContainer::setCollectedProcessors(Processors * collected_processors_) -{ - if (collected_processors && collected_processors_) - throw Exception("Cannot set collected processors to QueryPipeline because " - "another one object was already created for current pipeline." , ErrorCodes::LOGICAL_ERROR); - - std::swap(collected_processors, collected_processors_); - return collected_processors_; + pipe.collected_processors = processors; } QueryPipelineProcessorsCollector::QueryPipelineProcessorsCollector(QueryPipeline & pipeline_, IQueryPlanStep * step_) : pipeline(pipeline_), step(step_) { - pipeline.processors.setCollectedProcessors(&processors); + pipeline.setCollectedProcessors(&processors); } QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector() { - pipeline.processors.setCollectedProcessors(nullptr); + pipeline.setCollectedProcessors(nullptr); } Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group) diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 61b4dc507fd..74cc692ac42 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -28,7 +28,7 @@ public: ~QueryPipeline() = default; QueryPipeline(QueryPipeline &&) = default; QueryPipeline(const QueryPipeline &) = delete; - QueryPipeline & operator= (QueryPipeline && rhs); + QueryPipeline & operator= (QueryPipeline && rhs) = default; QueryPipeline & operator= (const QueryPipeline & rhs) = delete; /// All pipes must have same header. @@ -73,7 +73,11 @@ public: /// Unite several pipelines together. Result pipeline would have common_header structure. /// If collector is used, it will collect only newly-added processors, but not processors from pipelines. - void unitePipelines(std::vector> pipelines, const Block & common_header, size_t max_threads_limit = 0); + static QueryPipeline unitePipelines( + std::vector> pipelines, + const Block & common_header, + size_t max_threads_limit = 0, + Processors * collected_processors = nullptr); PipelineExecutorPtr execute(); @@ -113,10 +117,9 @@ public: } /// Convert query pipeline to pipe. - Pipe getPipe() &&; + static Pipe getPipe(QueryPipeline pipeline) { return std::move(pipeline.pipe); } private: - /// Destruction order: processors, header, locks, temporary storages, local contexts Pipe pipe; IOutputFormat * output_format = nullptr; @@ -132,6 +135,8 @@ private: void initRowsBeforeLimit(); + void setCollectedProcessors(Processors * processors); + friend class QueryPipelineProcessorsCollector; };