Add support for delete in RocksDB

This commit is contained in:
Antonio Andelic 2022-08-31 13:08:27 +00:00
parent d4a1b71b18
commit 0e6b3b870a
7 changed files with 126 additions and 14 deletions

View File

@ -34,11 +34,6 @@ InterpreterDeleteQuery::InterpreterDeleteQuery(const ASTPtr & query_ptr_, Contex
BlockIO InterpreterDeleteQuery::execute()
{
if (!getContext()->getSettingsRef().allow_experimental_lightweight_delete)
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Lightweight delete mutate is experimental. Set `allow_experimental_lightweight_delete` setting to enable it");
}
FunctionNameNormalizer().visit(query_ptr.get());
const ASTDeleteQuery & delete_query = query_ptr->as<ASTDeleteQuery &>();
auto table_id = getContext()->resolveStorageID(delete_query, Context::ResolveOrdinary);
@ -49,10 +44,6 @@ BlockIO InterpreterDeleteQuery::execute()
/// First check table storage for validations.
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
auto merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table);
if (!merge_tree)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only MergeTree family tables are supported");
checkStorageSupportsTransactionsIfNeeded(table, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
@ -69,6 +60,31 @@ BlockIO InterpreterDeleteQuery::execute()
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table);
if (!merge_tree)
{
/// Convert to MutationCommand
MutationCommands mutation_commands;
MutationCommand mut_command;
mut_command.type = MutationCommand::Type::DELETE;
mut_command.predicate = delete_query.predicate;
auto command = std::make_shared<ASTAlterCommand>();
command->type = ASTAlterCommand::DELETE;
command->predicate = delete_query.predicate;
mutation_commands.emplace_back(mut_command);
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext());
return {};
}
if (!getContext()->getSettingsRef().allow_experimental_lightweight_delete)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Lightweight delete mutate is experimental. Set `allow_experimental_lightweight_delete` setting to enable it");
/// Convert to MutationCommand
MutationCommands mutation_commands;
MutationCommand mut_command;

View File

@ -226,7 +226,7 @@ bool isStorageTouchedByMutations(
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and and give it into ExpressionTransform
/// For some reason it may copy context and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(
select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
@ -288,13 +288,15 @@ MutationsInterpreter::MutationsInterpreter(
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_)
bool can_execute_,
bool return_deleted_rows_)
: storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_))
, context(Context::createCopy(context_))
, can_execute(can_execute_)
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits().ignoreProjections())
, return_deleted_rows(return_deleted_rows_)
{
mutation_ast = prepare(!can_execute);
}
@ -478,8 +480,12 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
auto negated_predicate = makeASTFunction("isZeroOrNull", getPartitionAndPredicateExpressionForMutationCommand(command));
stages.back().filters.push_back(negated_predicate);
auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command);
if (!return_deleted_rows)
predicate = makeASTFunction("isZeroOrNull", predicate);
stages.back().filters.push_back(predicate);
}
else if (command.type == MutationCommand::UPDATE)
{

View File

@ -43,7 +43,8 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
MutationCommands commands_,
ContextPtr context_,
bool can_execute_);
bool can_execute_,
bool return_deleted_rows_ = false);
void validate();
@ -156,6 +157,9 @@ private:
/// Columns, that we need to read for calculation of skip indices, projections or TTL expressions.
ColumnDependencies dependencies;
// wether we should return deleted or nondeleted rows on DELETE mutation
bool return_deleted_rows;
};
}

View File

@ -1,6 +1,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/MutationCommands.h>
#include <DataTypes/DataTypesNumber.h>
@ -10,11 +11,15 @@
#include <Parsers/ASTCreateQuery.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISource.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/Context.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/Logger.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -192,6 +197,54 @@ void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr &
initDB();
}
void StorageEmbeddedRocksDB::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
{
for (const auto & command : commands)
{
if (command.type != MutationCommand::Type::DELETE)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only DELETE mutation supported for EmbeddedRocksDB");
}
}
void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPtr context_)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage = getStorageID();
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context_);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context_, true, /*return_deleted_row*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
auto column_it = std::find_if(block.begin(), block.end(), [&](const auto & column) { return column.name == primary_key; });
assert(column_it != block.end());
auto column = column_it->column;
auto size = column->size();
rocksdb::WriteBatch batch;
WriteBufferFromOwnString wb_key;
for (size_t i = 0; i < size; ++i)
{
wb_key.restart();
column_it->type->getDefaultSerialization()->serializeBinary(*column, i, wb_key);
auto status = batch.Delete(wb_key.str());
if (!status.ok())
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR);
}
auto status = rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR);
}
}
void StorageEmbeddedRocksDB::initDB()
{
rocksdb::Status status;

View File

@ -49,6 +49,9 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands &, ContextPtr) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(

View File

@ -0,0 +1,9 @@
1 Some string
2 Some other string
3 random
4 random2
-----------
3 random
4 random2
-----------
3 random

View File

@ -0,0 +1,21 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02416_rocksdb_delete;
CREATE TABLE 02416_rocksdb_delete (key UInt64, value String) Engine=EmbeddedRocksDB PRIMARY KEY(key);
INSERT INTO 02416_rocksdb_delete VALUES (1, 'Some string'), (2, 'Some other string'), (3, 'random'), (4, 'random2');
SELECT * FROM 02416_rocksdb_delete ORDER BY key;
SELECT '-----------';
DELETE FROM 02416_rocksdb_delete WHERE value LIKE 'Some%string';
SELECT * FROM 02416_rocksdb_delete ORDER BY key;
SELECT '-----------';
ALTER TABLE 02416_rocksdb_delete DELETE WHERE key >= 4;
SELECT * FROM 02416_rocksdb_delete ORDER BY key;
DROP TABLE IF EXISTS 02416_rocksdb_delete;