Uniform headers for union stream. #2156

This commit is contained in:
Nikolai Kochetov 2018-10-23 20:57:27 +03:00 committed by Alexey Zatelepin
parent 620054ea93
commit 1a6f04fe54

View File

@ -49,6 +49,7 @@
#include <Parsers/queryToString.h>
#include <ext/map.h>
#include <memory>
#include <DataStreams/ConvertingBlockInputStream.h>
namespace DB
@ -1293,6 +1294,17 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
pipeline.streams.resize(1);