Merge pull request #44718 from ClickHouse/lwd_mutation_always_sync

Make lightweight deletes always synchronous
This commit is contained in:
Alexander Gololobov 2023-01-09 15:58:02 +01:00 committed by GitHub
commit 742887ce53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 50 additions and 49 deletions

View File

@ -147,7 +147,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext());
table->mutate(mutation_commands, getContext(), false);
}
if (!partition_commands.empty())

View File

@ -72,7 +72,7 @@ BlockIO InterpreterDeleteQuery::execute()
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext());
table->mutate(mutation_commands, getContext(), false);
return {};
}
else if (table->supportsLightweightDelete())
@ -106,7 +106,7 @@ BlockIO InterpreterDeleteQuery::execute()
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext());
table->mutate(mutation_commands, getContext(), true);
return {};
}

View File

@ -486,7 +486,7 @@ public:
}
/// Mutate the table contents
virtual void mutate(const MutationCommands &, ContextPtr)
virtual void mutate(const MutationCommands &, ContextPtr, bool /*force_wait*/)
{
throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -218,7 +218,7 @@ void StorageEmbeddedRocksDB::checkMutationIsPossible(const MutationCommands & co
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only DELETE and UPDATE mutation supported for EmbeddedRocksDB");
}
void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPtr context_)
void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPtr context_, bool /*force_wait*/)
{
if (commands.empty())
return;

View File

@ -52,7 +52,7 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands &, ContextPtr) override;
void mutate(const MutationCommands &, ContextPtr, bool) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }

View File

@ -104,7 +104,7 @@ void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, con
throw Exception("Table engine Join supports only DELETE mutations", ErrorCodes::NOT_IMPLEMENTED);
}
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context, bool /*force_wait*/)
{
/// Firstly acquire lock for mutation, that locks changes of data.
/// We cannot acquire rwlock here, because read lock is needed

View File

@ -45,7 +45,7 @@ public:
/// Only delete is supported.
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.

View File

@ -320,10 +320,10 @@ void StorageMaterializedView::checkAlterPartitionIsPossible(
getTargetTable()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
}
void StorageMaterializedView::mutate(const MutationCommands & commands, ContextPtr local_context)
void StorageMaterializedView::mutate(const MutationCommands & commands, ContextPtr local_context, bool force_wait)
{
checkStatementCanBeForwarded();
getTargetTable()->mutate(commands, local_context);
getTargetTable()->mutate(commands, local_context, force_wait);
}
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)

View File

@ -65,7 +65,7 @@ public:
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void renameInMemory(const StorageID & new_table_id) override;

View File

@ -305,7 +305,7 @@ void StorageMemory::checkMutationIsPossible(const MutationCommands & /*commands*
/// Some validation will be added
}
void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context)
void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context, bool /*force_wait*/)
{
std::lock_guard lock(mutex);
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -67,7 +67,7 @@ public:
void drop() override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;

View File

@ -526,14 +526,14 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
it->second.writeCSN(csn);
}
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context, bool force_wait)
{
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version = startMutation(commands, query_context);
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
if (force_wait || query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
waitForMutation(version);
}

View File

@ -85,7 +85,7 @@ public:
const Names & deduplicate_by_columns,
ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
bool hasLightweightDeletedMask() const override;

View File

@ -132,7 +132,7 @@ public:
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
}
void mutate(const MutationCommands & commands, ContextPtr context) override { getNested()->mutate(commands, context); }
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override { getNested()->mutate(commands, context, force_wait); }
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }

View File

@ -6183,7 +6183,7 @@ void StorageReplicatedMergeTree::fetchPartition(
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context, bool force_wait)
{
/// Overview of the mutation algorithm.
///
@ -6296,7 +6296,8 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
waitMutation(mutation_entry.znode_name, query_context->getSettingsRef().mutations_sync);
const size_t mutations_sync = force_wait ? 2 : query_context->getSettingsRef().mutations_sync;
waitMutation(mutation_entry.znode_name, mutations_sync);
}
void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t mutations_sync) const

View File

@ -152,7 +152,7 @@ public:
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void waitMutation(const String & znode_name, size_t mutations_sync) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;

View File

@ -4,7 +4,7 @@ CREATE TABLE merge_table_standard_delete(id Int32, name String) ENGINE = MergeTr
INSERT INTO merge_table_standard_delete select number, toString(number) from numbers(100);
SET mutations_sync = 1;
SET mutations_sync = 0;
SET allow_experimental_lightweight_delete = 1;
DELETE FROM merge_table_standard_delete WHERE id = 10;
@ -51,9 +51,9 @@ DETACH TABLE t_light;
ATTACH TABLE t_light;
CHECK TABLE t_light;
alter table t_light MATERIALIZE INDEX i_c;
alter table t_light update b=-1 where a<3;
alter table t_light drop index i_c;
alter table t_light MATERIALIZE INDEX i_c SETTINGS mutations_sync=2;
alter table t_light update b=-1 where a<3 SETTINGS mutations_sync=2;
alter table t_light drop index i_c SETTINGS mutations_sync=2;
DETACH TABLE t_light;
ATTACH TABLE t_light;
@ -67,7 +67,7 @@ select * from t_light order by a;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' order by name;
optimize table t_light final;
optimize table t_light final SETTINGS mutations_sync=2;
select count(*) from t_light;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' and rows > 0 order by name;
@ -84,8 +84,8 @@ DETACH TABLE t_large;
ATTACH TABLE t_large;
CHECK TABLE t_large;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005;
ALTER TABLE t_large DELETE WHERE a=1;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005 SETTINGS mutations_sync=2;
ALTER TABLE t_large DELETE WHERE a=1 SETTINGS mutations_sync=2;
DETACH TABLE t_large;
ATTACH TABLE t_large;
@ -99,7 +99,7 @@ SELECT '----Test lighweight delete is disabled if table has projections-----';
CREATE TABLE t_proj(a UInt32, b int) ENGINE=MergeTree order BY a settings min_bytes_for_wide_part=0;
ALTER TABLE t_proj ADD PROJECTION p_1 (SELECT avg(a), avg(b), count());
ALTER TABLE t_proj ADD PROJECTION p_1 (SELECT avg(a), avg(b), count()) SETTINGS mutations_sync=2;
INSERT INTO t_proj SELECT number + 1, number + 1 FROM numbers(1000);

View File

@ -6,7 +6,7 @@ INSERT INTO merge_table_standard_delete select number, toString(number) from num
SELECT COUNT(), part_type FROM system.parts WHERE database = currentDatabase() AND table = 'merge_table_standard_delete' AND active GROUP BY part_type ORDER BY part_type;
SET mutations_sync = 1;
SET mutations_sync = 0;
SET allow_experimental_lightweight_delete = 1;
DELETE FROM merge_table_standard_delete WHERE id = 10;
@ -57,9 +57,9 @@ DETACH TABLE t_light;
ATTACH TABLE t_light;
CHECK TABLE t_light;
alter table t_light MATERIALIZE INDEX i_c;
alter table t_light update b=-1 where a<3;
alter table t_light drop index i_c;
alter table t_light MATERIALIZE INDEX i_c SETTINGS mutations_sync=2;
alter table t_light update b=-1 where a<3 SETTINGS mutations_sync=2;
alter table t_light drop index i_c SETTINGS mutations_sync=2;
DETACH TABLE t_light;
ATTACH TABLE t_light;
@ -73,7 +73,7 @@ select * from t_light order by a;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' order by name;
optimize table t_light final;
optimize table t_light final SETTINGS mutations_sync=2;
select count(*) from t_light;
select table, partition, name, rows from system.parts where database = currentDatabase() AND active and table ='t_light' and rows > 0 order by name;
@ -90,8 +90,8 @@ DETACH TABLE t_large;
ATTACH TABLE t_large;
CHECK TABLE t_large;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005;
ALTER TABLE t_large DELETE WHERE a=1;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005 SETTINGS mutations_sync=2;
ALTER TABLE t_large DELETE WHERE a=1 SETTINGS mutations_sync=2;
DETACH TABLE t_large;
ATTACH TABLE t_large;

View File

@ -4,7 +4,7 @@ CREATE TABLE lwd_test (id UInt64 , value String) ENGINE MergeTree() ORDER BY id;
INSERT INTO lwd_test SELECT number, randomString(10) FROM system.numbers LIMIT 1000000;
SET mutations_sync = 1;
SET mutations_sync = 0;
SET allow_experimental_lightweight_delete = 1;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
@ -22,7 +22,7 @@ SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Force merge to cleanup deleted rows';
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test;
@ -38,7 +38,7 @@ SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Do UPDATE mutation';
ALTER TABLE lwd_test UPDATE value = 'v' WHERE id % 2 == 0;
ALTER TABLE lwd_test UPDATE value = 'v' WHERE id % 2 == 0 SETTINGS mutations_sync = 2;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test;
@ -46,7 +46,7 @@ SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Force merge to cleanup deleted rows';
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test;
@ -62,7 +62,7 @@ SELECT 'First row', id, length(value) FROM lwd_test ORDER BY id LIMIT 1;
SELECT 'Do ALTER DELETE mutation that does a "heavyweight" delete';
ALTER TABLE lwd_test DELETE WHERE id % 3 == 0;
ALTER TABLE lwd_test DELETE WHERE id % 3 == 0 SETTINGS mutations_sync = 2;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test;
@ -73,7 +73,7 @@ DELETE FROM lwd_test WHERE id >= 300000 and id < 400000;
SELECT 'Force merge to cleanup deleted rows';
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test' AND active;
SELECT 'Count', count() FROM lwd_test;

View File

@ -8,7 +8,7 @@ CREATE TABLE replicated_table_r2(id Int32, name String) ENGINE = ReplicatedMerge
INSERT INTO replicated_table_r1 select number, toString(number) FROM numbers(100);
SET mutations_sync = 2;
SET mutations_sync = 0;
SET allow_experimental_lightweight_delete = 1;
DELETE FROM replicated_table_r1 WHERE id = 10;
@ -44,7 +44,7 @@ SELECT count(*) FROM t_light_r1;
SELECT * FROM t_light_r1 ORDER BY a;
SELECT * FROM t_light_r2 ORDER BY a;
OPTIMIZE TABLE t_light_r1 FINAL;
OPTIMIZE TABLE t_light_r1 FINAL SETTINGS mutations_sync = 2;
SELECT count(*) FROM t_light_r1;
DROP TABLE IF EXISTS t_light_r1 SYNC;

View File

@ -31,7 +31,7 @@ ORDER BY name, column;
SET mutations_sync = 2;
SET mutations_sync = 0;
SET allow_experimental_lightweight_delete = 1;
-- delete some rows using LWD
@ -54,7 +54,7 @@ ORDER BY name, column;
-- optimize table to physically delete the rows
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT * FROM lwd_test ORDER BY id, value;
@ -109,7 +109,7 @@ ORDER BY name, column;
-- optimize table to merge 2 parts together: the 1st has LDW rows and the 2nd doesn't have LWD rows
-- physically delete the rows
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT * FROM lwd_test ORDER BY id, value;
@ -164,7 +164,7 @@ ORDER BY name, column;
-- optimize table to merge 2 parts together, both of them have LWD rows
-- physically delete the rows
OPTIMIZE TABLE lwd_test FINAL;
OPTIMIZE TABLE lwd_test FINAL SETTINGS mutations_sync = 2;
SELECT * FROM lwd_test ORDER BY id, value;

View File

@ -2,5 +2,5 @@ drop table if exists test;
create table test (id Int32, key String) engine=MergeTree() order by tuple();
insert into test select number, toString(number) from numbers(1000000);
set allow_experimental_lightweight_delete=1;
delete from test where id % 2 = 0 SETTINGS mutations_sync=1;
delete from test where id % 2 = 0 SETTINGS mutations_sync=0;
select count() from test;

View File

@ -126,7 +126,7 @@ class Tester:
def main():
# Set mutations to synchronous mode and enable lightweight DELETE's
url = os.environ['CLICKHOUSE_URL'] + '&mutations_sync=2&allow_experimental_lightweight_delete=1&max_threads=1'
url = os.environ['CLICKHOUSE_URL'] + '&allow_experimental_lightweight_delete=1&max_threads=1'
default_index_granularity = 10;
total_rows = 8 * default_index_granularity