More simple

This commit is contained in:
Alexey Milovidov 2020-08-25 20:54:44 +03:00
parent 39730bfc30
commit b3845b10d4

View File

@ -28,13 +28,13 @@ public:
MemorySource(
Names column_names_,
BlocksList::iterator first_,
BlocksList::iterator last_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot)
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current(first_)
, last(last_) /// [first, last]
, current_it(first_)
, num_blocks(num_blocks_)
{
}
@ -49,7 +49,7 @@ protected:
}
else
{
const Block & src = *current;
const Block & src = *current_it;
Columns columns;
columns.reserve(column_names.size());
@ -57,17 +57,23 @@ protected:
for (const auto & name : column_names)
columns.emplace_back(src.getByName(name).column);
if (current == last)
if (current_block_idx == num_blocks)
{
is_finished = true;
}
else
++current;
{
++current_it;
++current_block_idx;
}
return Chunk(std::move(columns), src.rows());
}
}
private:
Names column_names;
BlocksList::iterator current;
BlocksList::iterator last;
BlocksList::iterator current_it;
size_t current_block_idx = 0;
const size_t num_blocks;
bool is_finished = false;
};
@ -126,27 +132,23 @@ Pipe StorageMemory::read(
Pipes pipes;
BlocksList::iterator first = data.begin();
BlocksList::iterator it = data.begin();
size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
{
auto next = first;
while (offset < stream * size / num_streams)
size_t next_offset = stream * size / num_streams;
size_t num_blocks = next_offset - offset;
assert(num_blocks > 0);
pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot));
while (offset < next_offset)
{
++next;
++it;
++offset;
}
if (first == next)
continue;
auto last = next;
--last;
pipes.emplace_back(std::make_shared<MemorySource>(column_names, first, last, *this, metadata_snapshot));
first = next;
}
return Pipe::unitePipes(std::move(pipes));