Convert streams structure to sample block in InterpreterSelectQuery.

This commit is contained in:
Nikolai Kochetov 2019-08-09 17:57:15 +03:00
parent 78516083ac
commit 9bd949fff5
2 changed files with 21 additions and 11 deletions

View File

@ -380,7 +380,7 @@ BlockIO InterpreterSelectQuery::execute()
{
Pipeline pipeline;
executeImpl(pipeline, input);
executeUnion(pipeline);
executeUnion(pipeline, getSampleBlock());
BlockIO res;
res.in = pipeline.firstStream();
@ -391,6 +391,7 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
{
Pipeline pipeline;
executeImpl(pipeline, input);
unifyStreams(pipeline, getSampleBlock());
return pipeline.streams;
}
@ -2221,12 +2222,12 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo
}
void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
void InterpreterSelectQuery::executeUnion(Pipeline & pipeline, Block header)
{
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
{
unifyStreams(pipeline);
unifyStreams(pipeline, header);
pipeline.firstStream() = std::make_shared<UnionBlockInputStream>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
@ -2454,18 +2455,27 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip
}
void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline)
void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header)
{
if (pipeline.hasMoreThanOneStream())
if (pipeline.hasMoreThanOneStream() || header)
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
size_t start = 0;
if (!header)
{
header = pipeline.streams.at(0)->getHeader();
start = 1;
}
for (size_t i = start; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto stream_header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
if (!blocksHaveEqualStructure(header, stream_header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
}

View File

@ -206,7 +206,7 @@ private:
void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header = {});
void executeLimitBy(Pipeline & pipeline);
void executeLimit(Pipeline & pipeline);
void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
@ -231,7 +231,7 @@ private:
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
/// If pipeline has several streams with different headers, add ConvertingBlockInputStream to first header.
void unifyStreams(Pipeline & pipeline);
void unifyStreams(Pipeline & pipeline, Block header);
enum class Modificator
{