Changed BlocksList to Blocks

This commit is contained in:
Maksim Kita 2020-12-14 00:21:25 +03:00
parent 3c6df61e5c
commit 69127ca0ed
2 changed files with 43 additions and 52 deletions

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &)>;
using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;
public:
/// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size.
@ -32,19 +32,15 @@ public:
MemorySource(
Names column_names_,
BlocksList::const_iterator first_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot,
std::shared_ptr<const BlocksList> data_,
std::shared_ptr<std::mutex> parallel_execution_lock_,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &) {})
std::shared_ptr<const Blocks> data_,
std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
InitializerFunc initializer_func_ = {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current_it(first_)
, num_blocks(num_blocks_)
, data(data_)
, parallel_execution_lock(parallel_execution_lock_)
, parallel_execution_index(parallel_execution_index_)
, initializer_func(std::move(initializer_func_))
{
}
@ -54,16 +50,19 @@ public:
protected:
Chunk generate() override
{
if (!postponed_init_done)
if (initializer_func)
{
initializer_func(current_it, num_blocks, data);
postponed_init_done = true;
initializer_func(data);
initializer_func = {};
}
if (current_block_idx == num_blocks)
size_t current_index = getAndIncrementExecutionIndex();
if (current_index >= data->size()) {
return {};
}
const Block & src = *current_it;
const Block & src = (*data)[current_index];
Columns columns;
columns.reserve(column_names.size());
@ -71,28 +70,26 @@ protected:
for (const auto & name : column_names)
columns.push_back(src.getByName(name).column);
if (++current_block_idx < num_blocks)
{
if (parallel_execution_lock) {
std::lock_guard lock(*parallel_execution_lock);
++current_it;
} else {
++current_it;
}
}
return Chunk(std::move(columns), src.rows());
}
private:
const Names column_names;
BlocksList::const_iterator current_it;
size_t num_blocks;
size_t current_block_idx = 0;
size_t getAndIncrementExecutionIndex()
{
if (parallel_execution_index)
{
return (*parallel_execution_index)++;
}
else
{
return execution_index;
}
}
std::shared_ptr<const BlocksList> data;
bool postponed_init_done = false;
std::shared_ptr<std::mutex> parallel_execution_lock;
const Names column_names;
size_t execution_index = 0;
std::shared_ptr<const Blocks> data;
std::shared_ptr<std::atomic<size_t>> parallel_execution_index;
InitializerFunc initializer_func;
};
@ -117,7 +114,7 @@ public:
metadata_snapshot->check(block, true);
{
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
new_data->push_back(block);
storage.data.set(std::move(new_data));
@ -133,7 +130,7 @@ private:
StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: IStorage(table_id_), data(std::make_unique<const BlocksList>())
: IStorage(table_id_), data(std::make_unique<const Blocks>())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
@ -165,22 +162,17 @@ Pipe StorageMemory::read(
return Pipe(std::make_shared<MemorySource>(
column_names,
data.get()->end(),
0,
*this,
metadata_snapshot,
data.get(),
nullptr,
[this](BlocksList::const_iterator & current_it, size_t & num_blocks, std::shared_ptr<const BlocksList> & current_data)
nullptr /* data */,
nullptr /* parallel execution index */,
[this](std::shared_ptr<const Blocks> & data_to_initialize)
{
current_data = data.get();
current_it = current_data->begin();
num_blocks = current_data->size();
data_to_initialize = data.get();
}));
}
auto current_data = data.get();
size_t size = current_data->size();
if (num_streams > size)
@ -188,12 +180,11 @@ Pipe StorageMemory::read(
Pipes pipes;
auto it = current_data->begin();
auto parallel_execution_lock = std::make_shared<std::mutex>();
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
for (size_t stream = 0; stream < num_streams; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, size, *this, metadata_snapshot, current_data, parallel_execution_lock));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, *this, metadata_snapshot, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));
@ -208,7 +199,7 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag
void StorageMemory::drop()
{
data.set(std::make_unique<BlocksList>());
data.set(std::make_unique<Blocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
@ -233,7 +224,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
auto in = interpreter->execute();
in->readPrefix();
BlocksList out;
Blocks out;
Block block;
while ((block = in->read()))
{
@ -241,17 +232,17 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
}
in->readSuffix();
std::unique_ptr<BlocksList> new_data;
std::unique_ptr<Blocks> new_data;
// all column affected
if (interpreter->isAffectingAllColumns())
{
new_data = std::make_unique<BlocksList>(out);
new_data = std::make_unique<Blocks>(out);
}
else
{
/// just some of the column affected, we need update it with new column
new_data = std::make_unique<BlocksList>(*(data.get()));
new_data = std::make_unique<Blocks>(*(data.get()));
auto data_it = new_data->begin();
auto out_it = out.begin();
@ -284,7 +275,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
data.set(std::make_unique<BlocksList>());
data.set(std::make_unique<Blocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}

View File

@ -91,7 +91,7 @@ public:
private:
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion<BlocksList> data;
MultiVersion<Blocks> data;
mutable std::mutex mutex;