Added InterpreterSelectQuery::unifyStreams

This commit is contained in:
Nikolai Kochetov 2018-11-06 14:44:45 +03:00
parent d7992b11d8
commit 8df77930ec
2 changed files with 22 additions and 20 deletions

View File

@ -1236,16 +1236,7 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
/// If there are several streams, then we merge them into one
if (pipeline.hasMoreThanOneStream())
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
unifyStreams(pipeline);
/** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
@ -1301,16 +1292,7 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
unifyStreams(pipeline);
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
@ -1440,6 +1422,23 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode));
}
void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline)
{
if (pipeline.hasMoreThanOneStream())
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
}
}
void InterpreterSelectQuery::ignoreWithTotals()
{

View File

@ -190,6 +190,9 @@ private:
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
/// If pipeline has several streams with different headers, add ConvertingBlockInputStream to first header.
void unifyStreams(Pipeline & pipeline);
enum class Modificator
{
ROLLUP = 0,