add mutation support for StorageMemory

This commit is contained in:
feng lv 2020-09-22 17:23:46 +08:00
parent 3e576a29c9
commit 5d7a77c207
2 changed files with 51 additions and 1 deletions

View File

@ -2,8 +2,9 @@
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageMemory.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMemory.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
@ -203,6 +204,53 @@ void StorageMemory::drop()
data.clear();
}
static inline void columnUpdate(Block & old_block, const Block & new_block)
{
for (const auto & it : new_block)
{
auto col_name = it.name;
auto & col_with_type_name = old_block.getByName(col_name);
col_with_type_name.column = it.column;
}
}
void StorageMemory::mutate(const MutationCommands & commands, const Context & context)
{
auto metadata_snapshot_ = getInMemoryMetadataPtr();
auto storage_id_ = getStorageID();
auto storage_ptr_ = DatabaseCatalog::instance().getTable(storage_id_, context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr_, metadata_snapshot_, commands, context, true);
auto in = interpreter->execute();
in->readPrefix();
BlocksList out;
Block block;
while (block = in->read())
{
out.push_back(block);
}
in->readSuffix();
std::lock_guard lock(mutex);
// all column affected
if (interpreter->isAffectingAllColumns())
{
std::swap(data, out);
}
else
{
auto data_it = data.begin();
auto out_it = out.begin();
while (data_it != data.end() && out_it != out.end())
{
columnUpdate(*data_it, *out_it);
++data_it;
++out_it;
}
}
}
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{

View File

@ -43,6 +43,8 @@ public:
void drop() override;
void mutate(const MutationCommands & commands, const Context & context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
std::optional<UInt64> totalRows() const override;