From 00546498b61f40a85db5fddfbdaa4699dee2228d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 30 Jan 2020 13:26:25 +0300 Subject: [PATCH] Fix StorageBuffer/ --- dbms/src/Storages/StorageBuffer.cpp | 45 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 70be06b270b..e8bfeec86c3 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -155,6 +155,27 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context return QueryProcessingStage::FetchColumns; } +static Pipes readAsPipes( + const StoragePtr & storage, + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) +{ + if (storage->supportProcessorsPipeline()) + return storage->readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams); + + auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams); + + Pipes pipes; + for (auto & stream : streams) + pipes.emplace_back(std::make_shared(stream)); + + return pipes; +}; + Pipes StorageBuffer::readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, @@ -163,26 +184,6 @@ Pipes StorageBuffer::readWithProcessors( size_t max_block_size, unsigned num_streams) { - auto read_as_pipes = [](const StoragePtr & storage, - const Names & column_names, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams) - { - if (storage->supportProcessorsPipeline()) - return storage->readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams); - - auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams); - - Pipes pipes; - for (auto & stream : streams) - pipes.emplace_back(std::make_shared(stream)); - - return pipes; - }; - Pipes pipes_from_dst; if (!no_destination) @@ -206,7 +207,7 @@ Pipes StorageBuffer::readWithProcessors( query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination); /// The destination table has the same structure of the requested columns and we can simply read blocks from there. - pipes_from_dst = read_as_pipes(destination, column_names, query_info, context, processed_stage, max_block_size, num_streams)); + pipes_from_dst = readAsPipes(destination, column_names, query_info, context, processed_stage, max_block_size, num_streams)); } else { @@ -241,7 +242,7 @@ Pipes StorageBuffer::readWithProcessors( } else { - pipes_from_dst = read_as_pipes(destination, columns_intersection, query_info, context, processed_stage, max_block_size, num_streams); + pipes_from_dst = readAsPipes(destination, columns_intersection, query_info, context, processed_stage, max_block_size, num_streams); for (auto & pipe : pipes_from_dst) { pipe.addSimpleTransform(std::make_shared(