diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e274bed641a..e68749638fb 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index b4d39c0738e..50bc7304465 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -8,8 +8,12 @@ #include #include #include -#include #include +#include +#include +#include +#include +#include namespace DB { @@ -442,8 +446,19 @@ void MergeJoin::mergeInMemoryRightBlocks() Blocks blocks_to_merge = blocksListToBlocks(right_blocks.blocks); right_blocks.clear(); + Pipes sources; + for (auto & block : blocks_to_merge) + { + sources.emplace_back(std::make_shared(std::make_shared(std::move(block)))); + } + + QueryPipeline pipeline; + pipeline.init(std::move(sources)); + /// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN - MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block); + pipeline.addPipe({std::make_shared(pipeline.getHeader(), right_sort_description, max_rows_in_right_block, 0, 0, 0, nullptr, 0)}); + + auto sorted_input = PipelineExecutingBlockInputStream(std::move(pipeline)); while (Block block = sorted_input.read()) { diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 1e8d4136308..dd435ffc6b2 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 5bf3f8a3ce2..2ac1766da36 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -1,4 +1,3 @@ -#include #include #include #include