Update only affected rows in KV storage

This commit is contained in:
Antonio Andelic 2023-04-05 16:07:59 +00:00
parent e428ed5543
commit 0260b84bc3
8 changed files with 105 additions and 90 deletions

View File

@ -373,11 +373,11 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: MutationsInterpreter(
Source(std::move(storage_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
can_execute_, return_all_columns_, return_mutated_rows_)
{
if (can_execute_ && dynamic_cast<const MergeTreeData *>(source.getStorage().get()))
{
@ -396,11 +396,11 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
metadata_snapshot_, std::move(commands_), std::move(context_),
can_execute_, return_all_columns_, return_deleted_rows_)
can_execute_, return_all_columns_, return_mutated_rows_)
{
}
@ -411,7 +411,7 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_)
bool return_mutated_rows_)
: source(std::move(source_))
, metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_))
@ -419,7 +419,7 @@ MutationsInterpreter::MutationsInterpreter(
, can_execute(can_execute_)
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits().ignoreProjections())
, return_all_columns(return_all_columns_)
, return_deleted_rows(return_deleted_rows_)
, return_mutated_rows(return_mutated_rows_)
{
prepare(!can_execute);
}
@ -600,7 +600,7 @@ void MutationsInterpreter::prepare(bool dry_run)
for (auto & command : commands)
{
// we can return deleted rows only if it's the only present command
assert(command.type == MutationCommand::DELETE || !return_deleted_rows);
assert(command.type == MutationCommand::DELETE || command.type == MutationCommand::UPDATE || !return_mutated_rows);
if (command.type == MutationCommand::DELETE)
{
@ -610,7 +610,7 @@ void MutationsInterpreter::prepare(bool dry_run)
auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command);
if (!return_deleted_rows)
if (!return_mutated_rows)
predicate = makeASTFunction("isZeroOrNull", predicate);
stages.back().filters.push_back(predicate);
@ -697,6 +697,9 @@ void MutationsInterpreter::prepare(bool dry_run)
type_literal);
stages.back().column_to_updated.emplace(column, updated_column);
if (condition && return_mutated_rows)
stages.back().filters.push_back(condition);
}
if (!affected_materialized.empty())

View File

@ -48,7 +48,7 @@ public:
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
bool return_mutated_rows_ = false);
/// Special case for MergeTree
MutationsInterpreter(
@ -59,7 +59,7 @@ public:
ContextPtr context_,
bool can_execute_,
bool return_all_columns_ = false,
bool return_deleted_rows_ = false);
bool return_mutated_rows_ = false);
void validate();
size_t evaluateCommandsSize();
@ -136,7 +136,7 @@ private:
ContextPtr context_,
bool can_execute_,
bool return_all_columns_,
bool return_deleted_rows_);
bool return_mutated_rows_);
void prepare(bool dry_run);
@ -210,8 +210,8 @@ private:
// whether all columns should be returned, not just updated
bool return_all_columns;
// whether we should return deleted or nondeleted rows on DELETE mutation
bool return_deleted_rows;
// whether we should return mutated or all existing rows
bool return_mutated_rows;
};
}

View File

@ -237,7 +237,7 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
context_,
/*can_execute_*/ true,
/*return_all_columns_*/ true,
/*return_deleted_rows_*/ true);
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
@ -279,7 +279,13 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated");
auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr, metadata_snapshot, commands, context_, /*can_execute_*/ true, /*return_all_columns*/ true);
storage_ptr,
metadata_snapshot,
commands,
context_,
/*can_execute_*/ true,
/*return_all_columns*/ true,
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

View File

@ -864,7 +864,7 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
local_context,
/*can_execute_*/ true,
/*return_all_columns_*/ true,
/*return_deleted_rows_*/ true);
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
@ -927,7 +927,13 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated");
auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr, metadata_snapshot, commands, local_context, /*can_execute_*/ true, /*return_all_columns*/ true);
storage_ptr,
metadata_snapshot,
commands,
local_context,
/*can_execute_*/ true,
/*return_all_columns*/ true,
/*return_mutated_rows*/ true);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

View File

@ -1,32 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
1 Some string 0 0
2 Some other string 0 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
4 random2 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
3 random 0 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
1 String 10 0
2 String 20 0
3 String 30 0
4 String 40 0
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
1 String 102 1
2 String 202 1
3 Another 302 2
4 Another 402 2
-----------

View File

@ -1,44 +1,44 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02661_keepermap_delete_update;
DROP TABLE IF EXISTS 02577_keepermap_delete_update;
CREATE TABLE 02661_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02661_keepermap_delete_update') PRIMARY KEY(key);
CREATE TABLE 02577_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02577_keepermap_delete_update') PRIMARY KEY(key);
INSERT INTO 02661_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
INSERT INTO 02577_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02661_keepermap_delete_update WHERE value LIKE 'Some%string';
DELETE FROM 02577_keepermap_delete_update WHERE value LIKE 'Some%string';
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update DELETE WHERE key >= 4;
ALTER TABLE 02577_keepermap_delete_update DELETE WHERE key >= 4;
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02661_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02661_keepermap_delete_update;
DELETE FROM 02577_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02577_keepermap_delete_update;
SELECT '-----------';
INSERT INTO 02661_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
INSERT INTO 02577_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError BAD_ARGUMENTS }
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError BAD_ARGUMENTS }
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
ALTER TABLE 02577_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT *, _version FROM 02577_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update ON CLUSTER test_shard_localhost UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100; -- { serverError BAD_ARGUMENTS }
ALTER TABLE 02577_keepermap_delete_update ON CLUSTER test_shard_localhost UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100; -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS 02661_keepermap_delete_update;
DROP TABLE IF EXISTS 02577_keepermap_delete_update;

View File

@ -1,32 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
1 Some string 0 0 0
2 Some other string 0 0 0
3 random 0 0 0
4 random2 0 0 0
-----------
3 random 0
4 random2 0
3 random 0 0
4 random2 0 0
-----------
3 random 0
3 random 0 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
1 String 10 0
2 String 20 0
3 String 30 0
4 String 40 0
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
1 String 10 0
2 String 20 0
3 Another 30 1
4 Another 40 1
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
1 String 102 1
2 String 202 1
3 Another 302 2
4 Another 402 2
-----------

View File

@ -8,17 +8,17 @@ CREATE TABLE 02707_keepermap_delete_update (key UInt64, value String, value2 UIn
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE value LIKE 'Some%string';
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update DELETE WHERE key >= 4;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE 1 = 1;
@ -26,19 +26,19 @@ SELECT count() FROM 02707_keepermap_delete_update;
SELECT '-----------';
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError 36 }
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT *, _version FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DROP TABLE IF EXISTS 02707_keepermap_delete_update;