Minor fixes

This commit is contained in:
Alexander Kazakov 2020-10-06 17:04:08 +03:00
parent e54ff6e60a
commit ae2c106f94

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress class MemorySource : public SourceWithProgress
{ {
using InitializerFunc = std::function<void(BlocksList::iterator &, size_t &)>; using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &)>;
public: public:
/// Blocks are stored in std::list which may be appended in another thread. /// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size. /// We use pointer to the beginning of the list and its current size.
@ -34,7 +34,7 @@ public:
size_t num_blocks_, size_t num_blocks_,
const StorageMemory & storage, const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
InitializerFunc initializer_func_ = [](size_t &, size_t &) {}) InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID())) : SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(column_names_) , column_names(column_names_)
, current_it(first_) , current_it(first_)
@ -146,17 +146,18 @@ Pipe StorageMemory::read(
/// set for IN or hash table for JOIN, which can't be done concurrently. /// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit. /// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
return {std::make_shared<MemorySource>( return Pipe(
std::make_shared<MemorySource>(
column_names, data.end(), 0, *this, metadata_snapshot, column_names, data.end(), 0, *this, metadata_snapshot,
/// This hack is needed for global subqueries. /// This hack is needed for global subqueries.
/// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading /// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading
[this](size_t & current_it, size_t & num_blocks) [this](BlocksList::const_iterator & current_it, size_t & num_blocks)
{ {
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
current_it = data.begin(); current_it = data.begin();
num_blocks = data.size(); num_blocks = data.size();
} }
)}; ));
} }
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -168,7 +169,7 @@ Pipe StorageMemory::read(
Pipes pipes; Pipes pipes;
BlocksList::iterator it = data.begin(); BlocksList::const_iterator it = data.begin();
size_t offset = 0; size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream) for (size_t stream = 0; stream < num_streams; ++stream)
@ -178,7 +179,7 @@ Pipe StorageMemory::read(
assert(num_blocks > 0); assert(num_blocks > 0);
pipes.push_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot)); pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot));
while (offset < next_offset) while (offset < next_offset)
{ {