From 9bd949fff5462029601ac8e87bbd6a0c672761da Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 9 Aug 2019 17:57:15 +0300 Subject: [PATCH] Convert streams structure to sample block in InterpreterSelectQuery. --- .../Interpreters/InterpreterSelectQuery.cpp | 28 +++++++++++++------ .../src/Interpreters/InterpreterSelectQuery.h | 4 +-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 7c1f76f99e9..5a0230e6798 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -380,7 +380,7 @@ BlockIO InterpreterSelectQuery::execute() { Pipeline pipeline; executeImpl(pipeline, input); - executeUnion(pipeline); + executeUnion(pipeline, getSampleBlock()); BlockIO res; res.in = pipeline.firstStream(); @@ -391,6 +391,7 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams() { Pipeline pipeline; executeImpl(pipeline, input); + unifyStreams(pipeline, getSampleBlock()); return pipeline.streams; } @@ -2221,12 +2222,12 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo } -void InterpreterSelectQuery::executeUnion(Pipeline & pipeline) +void InterpreterSelectQuery::executeUnion(Pipeline & pipeline, Block header) { /// If there are still several streams, then we combine them into one if (pipeline.hasMoreThanOneStream()) { - unifyStreams(pipeline); + unifyStreams(pipeline, header); pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams); pipeline.stream_with_non_joined_data = nullptr; @@ -2454,18 +2455,27 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip } -void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline) +void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header) { - if (pipeline.hasMoreThanOneStream()) + if (pipeline.hasMoreThanOneStream() || header) { /// Unify streams in case they have different headers. - auto first_header = pipeline.streams.at(0)->getHeader(); - for (size_t i = 1; i < pipeline.streams.size(); ++i) + + size_t start = 0; + + if (!header) + { + header = pipeline.streams.at(0)->getHeader(); + start = 1; + } + + for (size_t i = start; i < pipeline.streams.size(); ++i) { auto & stream = pipeline.streams[i]; - auto header = stream->getHeader(); + auto stream_header = stream->getHeader(); auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name; - if (!blocksHaveEqualStructure(first_header, header)) + + if (!blocksHaveEqualStructure(header, stream_header)) stream = std::make_shared(context, stream, first_header, mode); } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index d71bd310ee6..7c7b657f5ce 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -206,7 +206,7 @@ private: void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info); void executeMergeSorted(Pipeline & pipeline); void executePreLimit(Pipeline & pipeline); - void executeUnion(Pipeline & pipeline); + void executeUnion(Pipeline & pipeline, Block header = {}); void executeLimitBy(Pipeline & pipeline); void executeLimit(Pipeline & pipeline); void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); @@ -231,7 +231,7 @@ private: void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map & subqueries_for_sets); /// If pipeline has several streams with different headers, add ConvertingBlockInputStream to first header. - void unifyStreams(Pipeline & pipeline); + void unifyStreams(Pipeline & pipeline, Block header); enum class Modificator {