From d7aa7412aee5d2b5c11cdb8510c6bfd07e524f3e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 10 Apr 2019 19:28:37 +0300 Subject: [PATCH] Fix totals fro single remote source. --- .../Interpreters/InterpreterSelectQuery.cpp | 4 ++ dbms/src/Processors/QueryPipeline.cpp | 14 +++++++ dbms/src/Processors/QueryPipeline.h | 3 ++ .../src/Processors/Sources/SourceFromTotals.h | 39 +++++++++++++++++++ 4 files changed, 60 insertions(+) create mode 100644 dbms/src/Processors/Sources/SourceFromTotals.h diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 8c17c0217f2..6d351099d44 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -59,6 +59,7 @@ #include #include +#include #include #include #include @@ -1324,6 +1325,9 @@ void InterpreterSelectQuery::executeFetchColumns( sources.emplace_back(std::make_shared(stream->getHeader(), stream)); pipeline.init(std::move(sources)); + + if (options.to_stage == QueryProcessingStage::Complete) + pipeline.addTotals(std::make_shared(streams)); } else pipeline.streams = std::move(streams); diff --git a/dbms/src/Processors/QueryPipeline.cpp b/dbms/src/Processors/QueryPipeline.cpp index 65e2e411e6d..379f6e18c07 100644 --- a/dbms/src/Processors/QueryPipeline.cpp +++ b/dbms/src/Processors/QueryPipeline.cpp @@ -270,6 +270,20 @@ void QueryPipeline::addDefaultTotals() processors.emplace_back(source); } +void QueryPipeline::addTotals(ProcessorPtr source) +{ + checkInitialized(); + + if (totals_having_port) + throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR); + + checkSource(source); + assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline"); + + totals_having_port = &source->getOutputs().front(); + processors.emplace_back(source); +} + void QueryPipeline::addExtremesTransform(ProcessorPtr transform) { checkInitialized(); diff --git a/dbms/src/Processors/QueryPipeline.h b/dbms/src/Processors/QueryPipeline.h index 2c51b6be7b7..dfbdec6107d 100644 --- a/dbms/src/Processors/QueryPipeline.h +++ b/dbms/src/Processors/QueryPipeline.h @@ -47,6 +47,9 @@ public: /// Add totals which returns one chunk with single row with defaults. void addDefaultTotals(); + /// Add already calculated totals. + void addTotals(ProcessorPtr source); + /// Will read from this stream after all data was read from other streams. void addDelayedStream(ProcessorPtr source); bool hasDelayedStream() const { return delayed_stream_port; } diff --git a/dbms/src/Processors/Sources/SourceFromTotals.h b/dbms/src/Processors/Sources/SourceFromTotals.h new file mode 100644 index 00000000000..86f3aaab446 --- /dev/null +++ b/dbms/src/Processors/Sources/SourceFromTotals.h @@ -0,0 +1,39 @@ +#pragma once +#include +#include + +namespace DB +{ + +class IBlockInputStream; +using BlockInputStreamPtr = std::shared_ptr; +using BlockInputStreams = std::vector; + +class SourceFromTotals : public ISource +{ +public: + explicit SourceFromTotals(BlockInputStreams streams_) + : ISource(streams_.at(0)->getHeader()), streams(std::move(streams_)) {} + + String getName() const override { return "SourceFromTotals"; } + + Chunk generate() override + { + if (generated) + return {}; + + generated = true; + + for (auto & stream : streams) + if (auto block = stream->getTotals()) + return Chunk(block.getColumns(), 1); + + return {}; + } + +private: + bool generated = false; + BlockInputStreams streams; +}; + +}