Fix totals fro single remote source.

This commit is contained in:
Nikolai Kochetov 2019-04-10 19:28:37 +03:00
parent 3d1be12b79
commit d7aa7412ae
4 changed files with 60 additions and 0 deletions

View File

@ -59,6 +59,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/SourceFromTotals.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
@ -1324,6 +1325,9 @@ void InterpreterSelectQuery::executeFetchColumns(
sources.emplace_back(std::make_shared<SourceFromInputStream>(stream->getHeader(), stream));
pipeline.init(std::move(sources));
if (options.to_stage == QueryProcessingStage::Complete)
pipeline.addTotals(std::make_shared<SourceFromTotals>(streams));
}
else
pipeline.streams = std::move(streams);

View File

@ -270,6 +270,20 @@ void QueryPipeline::addDefaultTotals()
processors.emplace_back(source);
}
void QueryPipeline::addTotals(ProcessorPtr source)
{
checkInitialized();
if (totals_having_port)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
checkSource(source);
assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
totals_having_port = &source->getOutputs().front();
processors.emplace_back(source);
}
void QueryPipeline::addExtremesTransform(ProcessorPtr transform)
{
checkInitialized();

View File

@ -47,6 +47,9 @@ public:
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
/// Add already calculated totals.
void addTotals(ProcessorPtr source);
/// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source);
bool hasDelayedStream() const { return delayed_stream_port; }

View File

@ -0,0 +1,39 @@
#pragma once
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class SourceFromTotals : public ISource
{
public:
explicit SourceFromTotals(BlockInputStreams streams_)
: ISource(streams_.at(0)->getHeader()), streams(std::move(streams_)) {}
String getName() const override { return "SourceFromTotals"; }
Chunk generate() override
{
if (generated)
return {};
generated = true;
for (auto & stream : streams)
if (auto block = stream->getTotals())
return Chunk(block.getColumns(), 1);
return {};
}
private:
bool generated = false;
BlockInputStreams streams;
};
}