From 2cd1b37f9b5b6576478a44846f5fa4b78de8f381 Mon Sep 17 00:00:00 2001 From: alexX512 Date: Tue, 2 May 2023 07:26:36 +0000 Subject: [PATCH] Add support for connection of partial result processors with multiple ports --- .../PullingAsyncPipelineExecutor.cpp | 5 +- .../Executors/PullingAsyncPipelineExecutor.h | 2 +- src/Processors/Formats/IOutputFormat.cpp | 2 +- src/Processors/Formats/IOutputFormat.h | 4 + .../Transforms/LimitPartialResultTransform.h | 1 + src/QueryPipeline/Pipe.cpp | 82 +++++++++++++++---- src/QueryPipeline/Pipe.h | 4 +- src/Server/TCPHandler.cpp | 3 +- 8 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp index 73a3142d459..0b28ed67cd1 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.cpp @@ -41,12 +41,15 @@ struct PullingAsyncPipelineExecutor::Data } }; -PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_) +PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool /*has_partial_result_setting*/) : pipeline(pipeline_) { if (!pipeline.pulling()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PullingAsyncPipelineExecutor must be pulling"); lazy_format = std::make_shared(pipeline.output->getHeader()); + // if (has_partial_result_setting) + // lazy_format->activatePartialResultProtocol(); + pipeline.complete(lazy_format); } diff --git a/src/Processors/Executors/PullingAsyncPipelineExecutor.h b/src/Processors/Executors/PullingAsyncPipelineExecutor.h index 361bcc0155c..202ecbf281b 100644 --- a/src/Processors/Executors/PullingAsyncPipelineExecutor.h +++ b/src/Processors/Executors/PullingAsyncPipelineExecutor.h @@ -21,7 +21,7 @@ struct ProfileInfo; class PullingAsyncPipelineExecutor { public: - explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_); + explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_, bool has_partial_result_setting = false); ~PullingAsyncPipelineExecutor(); /// Get structure of returned block or chunk. diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index d3e0ed2fcc6..6496f5357e7 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -129,7 +129,7 @@ void IOutputFormat::work() case Main: result_rows += current_chunk.getNumRows(); result_bytes += current_chunk.allocatedBytes(); - if (!was_main_input && current_chunk.hasRows()) + if (is_partial_result_protocol_active && !was_main_input && current_chunk.hasRows()) { consume(Chunk(current_chunk.cloneEmptyColumns(), 0)); was_main_input = true; diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 2c598e2620b..6e5e21055a3 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -105,6 +105,8 @@ public: void clearLastLines(size_t lines_number); + void activatePartialResultProtocol() { is_partial_result_protocol_active = true; } + protected: friend class ParallelFormattingOutputFormat; @@ -192,6 +194,8 @@ private: size_t rows_read_before = 0; bool are_totals_written = false; + bool is_partial_result_protocol_active = false; + /// Counters for consumed chunks. Are used for QueryLog. size_t result_rows = 0; size_t result_bytes = 0; diff --git a/src/Processors/Transforms/LimitPartialResultTransform.h b/src/Processors/Transforms/LimitPartialResultTransform.h index e5d772db289..8ed60d20da3 100644 --- a/src/Processors/Transforms/LimitPartialResultTransform.h +++ b/src/Processors/Transforms/LimitPartialResultTransform.h @@ -5,6 +5,7 @@ namespace DB { +/// Currently support only single thread implementation with one input and one output ports class LimitPartialResultTransform : public PartialResultTransform { public: diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index b1a740b3390..ff86d8df776 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -483,8 +483,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort if (extremes) extremes_port = extremes; - /// TODO: Add support for partial result in multithreading mode - dropPartialResult(); + addPartialResultTransform(transform); size_t next_output = 0; for (auto & input : inputs) @@ -573,8 +572,7 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes_port = nullptr; } - /// TODO: Add support for partial result in multithreading mode - dropPartialResult(); + addPartialResultTransform(transform); bool found_totals = false; bool found_extremes = false; @@ -627,11 +625,10 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * addProcessor(std::move(transform)); - max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } -void Pipe::addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t partial_result_port_id) +void Pipe::addPartialResultSimpleTransform(ProcessorPtr transform, size_t partial_result_port_id) { if (isPartialResultActive()) { @@ -643,23 +640,75 @@ void Pipe::addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t part return; } - if (partial_result_port == nullptr) - { - auto source = std::make_shared(getHeader()); - partial_result_port = &source->getPort(); - - addProcessor(std::move(source)); - } - auto partial_result_transform = transform->getPartialResultProcessor(std::move(transform), partial_result_limit, partial_result_duration_ms); - connect(*partial_result_port, partial_result_transform->getInputs().front()); + connectPartialResultPort(partial_result_port, partial_result_transform->getInputs().front()); + partial_result_port = &partial_result_transform->getOutputs().front(); addProcessor(std::move(partial_result_transform)); } } +void Pipe::addPartialResultTransform(ProcessorPtr transform) +{ + if (isPartialResultActive()) + { + size_t new_outputs_size = transform->getOutputs().size(); + + if (!transform->supportPartialResultProcessor()) + { + for (auto & partial_result_port : partial_result_ports) + dropPort(partial_result_port, *processors, collected_processors); + + partial_result_ports.assign(new_outputs_size, nullptr); + return; + } + + auto partial_result_transform = transform->getPartialResultProcessor(std::move(transform), partial_result_limit, partial_result_duration_ms); + auto & inputs = partial_result_transform->getInputs(); + + if (inputs.size() != partial_result_ports.size()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot add transform {} to Pipe because it has {} input ports, but {} expected", + partial_result_transform->getName(), + inputs.size(), + partial_result_ports.size()); + + size_t next_port = 0; + for (auto & input : inputs) + { + connectPartialResultPort(partial_result_ports[next_port], input); + ++next_port; + } + + partial_result_ports.assign(new_outputs_size, nullptr); + + next_port = 0; + for (auto & new_partial_result_port : partial_result_transform->getOutputs()) + { + partial_result_ports[next_port] = &new_partial_result_port; + ++next_port; + } + + addProcessor(std::move(partial_result_transform)); + } +} + +void Pipe::connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port) +{ + if (partial_result_port == nullptr) + { + auto source = std::make_shared(getHeader()); + partial_result_port = &source->getPort(); + + addProcessor(std::move(source)); + } + + connect(*partial_result_port, partial_result_transform_port); +} + void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) { if (output_ports.empty()) @@ -704,7 +753,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) connect(*port, transform->getInputs().front()); port = &transform->getOutputs().front(); if (stream_type == StreamType::Main) - addPartialResultTransformIfNeeded(transform, partial_result_port_id); + addPartialResultSimpleTransform(transform, partial_result_port_id); addProcessor(std::move(transform)); } @@ -832,6 +881,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) add_transform(totals_port, StreamType::Totals); add_transform(extremes_port, StreamType::Extremes); + dropPartialResult(); output_ports.clear(); header.clear(); diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 7200cf4944a..650e4d664a5 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -76,7 +76,9 @@ public: void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); - void addPartialResultTransformIfNeeded(ProcessorPtr transform, size_t partial_result_port_id); + void addPartialResultTransform(ProcessorPtr transform); + void addPartialResultSimpleTransform(ProcessorPtr transform, size_t partial_result_port_id); + void connectPartialResultPort(OutputPort * partial_result_port, InputPort & partial_result_transform_port); enum class StreamType { diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index a608219ed63..67ed1d071e4 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -815,7 +815,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors() std::unique_lock progress_lock(task_callback_mutex, std::defer_lock); { - PullingAsyncPipelineExecutor executor(pipeline); + bool has_partial_result_setting = query_context->getSettingsRef().partial_result_update_duration_ms.totalMilliseconds() > 0; + PullingAsyncPipelineExecutor executor(pipeline, has_partial_result_setting); CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; Block block;