Fix totals and extremes header for views.

This commit is contained in:
Nikolai Kochetov 2020-05-26 16:19:24 +03:00
parent 5a0f356cd6
commit 12384551da
4 changed files with 41 additions and 14 deletions

View File

@ -23,7 +23,7 @@ public:
/// Will connect pipes outputs with transform inputs automatically.
Pipe(Pipes && pipes, ProcessorPtr transform);
/// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape.
Pipe(OutputPort * port);
explicit Pipe(OutputPort * port);
Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default;

View File

@ -703,6 +703,11 @@ void QueryPipeline::initRowsBeforeLimit()
Pipe QueryPipeline::getPipe() &&
{
resize(1);
return std::move(std::move(*this).getPipes()[0]);
}
Pipes QueryPipeline::getPipes() &&
{
Pipe pipe(std::move(processors), streams.at(0), totals_having_port, extremes_port);
pipe.max_parallel_streams = streams.maxParallelStreams();
@ -721,7 +726,13 @@ Pipe QueryPipeline::getPipe() &&
if (extremes_port)
pipe.setExtremesPort(extremes_port);
return pipe;
Pipes pipes;
pipes.emplace_back(std::move(pipe));
for (size_t i = 1; i < streams.size(); ++i)
pipes.emplace_back(Pipe(streams[i]));
return pipes;
}
PipelineExecutorPtr QueryPipeline::execute()

View File

@ -155,8 +155,9 @@ public:
/// Set upper limit for the recommend number of threads
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
/// Convert query pipeline to single pipe.
/// Convert query pipeline to single or several pipes.
Pipe getPipe() &&;
Pipes getPipes() &&;
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts

View File

@ -19,6 +19,8 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
namespace DB
@ -62,29 +64,42 @@ Pipes StorageView::read(
if (context.getSettings().enable_optimize_predicate_expression)
current_inner_query = getRuntimeViewQuery(*query_info.query->as<const ASTSelectQuery>(), context);
QueryPipeline pipeline;
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
if (query_info.force_tree_shaped_pipeline)
{
QueryPipeline pipeline;
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
for (auto & stream : streams)
{
stream = std::make_shared<MaterializingBlockInputStream>(stream);
stream = std::make_shared<ConvertingBlockInputStream>(stream, getSampleBlockForColumns(column_names),
ConvertingBlockInputStream::MatchColumnsMode::Name);
}
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
else
/// TODO: support multiple streams here. Need more general interface than pipes.
pipes.emplace_back(interpreter.executeWithProcessors().getPipe());
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
for (auto & pipe : pipes)
{
pipe.addSimpleTransform(std::make_shared<MaterializingTransform>(pipe.getHeader()));
auto pipeline = interpreter.executeWithProcessors();
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
/// And also convert to expected structure.
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name));
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
});
pipes = std::move(pipeline).getPipes();
}
return pipes;