diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 5ef4ab24914..b232ddddd33 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -28,13 +28,13 @@ public: MemorySource( Names column_names_, BlocksList::iterator first_, - BlocksList::iterator last_, + size_t num_blocks_, const StorageMemory & storage, const StorageMetadataPtr & metadata_snapshot) : SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID())) , column_names(std::move(column_names_)) - , current(first_) - , last(last_) /// [first, last] + , current_it(first_) + , num_blocks(num_blocks_) { } @@ -49,7 +49,7 @@ protected: } else { - const Block & src = *current; + const Block & src = *current_it; Columns columns; columns.reserve(column_names.size()); @@ -57,17 +57,23 @@ protected: for (const auto & name : column_names) columns.emplace_back(src.getByName(name).column); - if (current == last) + if (current_block_idx == num_blocks) + { is_finished = true; + } else - ++current; + { + ++current_it; + ++current_block_idx; + } return Chunk(std::move(columns), src.rows()); } } private: Names column_names; - BlocksList::iterator current; - BlocksList::iterator last; + BlocksList::iterator current_it; + size_t current_block_idx = 0; + const size_t num_blocks; bool is_finished = false; }; @@ -126,27 +132,23 @@ Pipe StorageMemory::read( Pipes pipes; - BlocksList::iterator first = data.begin(); + BlocksList::iterator it = data.begin(); size_t offset = 0; for (size_t stream = 0; stream < num_streams; ++stream) { - auto next = first; - while (offset < stream * size / num_streams) + size_t next_offset = stream * size / num_streams; + size_t num_blocks = next_offset - offset; + + assert(num_blocks > 0); + + pipes.emplace_back(std::make_shared(column_names, it, num_blocks, *this, metadata_snapshot)); + + while (offset < next_offset) { - ++next; + ++it; ++offset; } - - if (first == next) - continue; - - auto last = next; - --last; - - pipes.emplace_back(std::make_shared(column_names, first, last, *this, metadata_snapshot)); - - first = next; } return Pipe::unitePipes(std::move(pipes));