diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index dbdfe650713..c6f9459d0db 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -8,9 +8,8 @@ namespace DB { -MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part) - : database{database_} - , table{table_} +MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part) + : table_id{table_id_} , partition_id{future_part.part_info.partition_id} , result_part_name{future_part.name} , result_part_path{future_part.path} @@ -60,8 +59,8 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str MergeInfo MergeListElement::getInfo() const { MergeInfo res; - res.database = database; - res.table = table; + res.database = table_id.getDatabaseName(); + res.table = table_id.getTableName(); res.result_part_name = result_part_name; res.result_part_path = result_part_path; res.partition_id = partition_id; diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 6b2af414835..9680ce6ac30 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -54,8 +55,7 @@ struct FutureMergedMutatedPart; struct MergeListElement : boost::noncopyable { - const std::string database; - const std::string table; + const StorageID table_id; std::string partition_id; const std::string result_part_name; @@ -94,7 +94,7 @@ struct MergeListElement : boost::noncopyable /// Detected after merge already started std::atomic merge_algorithm; - MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part); + MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part); MergeInfo getInfo() const; @@ -122,12 +122,13 @@ public: --merges_with_ttl_counter; } - void cancelPartMutations(const String & partition_id, Int64 mutation_version) + void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version) { std::lock_guard lock{mutex}; for (auto & merge_element : entries) { if ((partition_id.empty() || merge_element.partition_id == partition_id) + && merge_element.table_id == table_id && merge_element.source_data_version < mutation_version && merge_element.result_data_version >= mutation_version) merge_element.is_cancelled = true; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 3d3530825d9..584f9345423 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -625,7 +625,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) if (!to_kill) return CancellationCode::NotFound; - getContext()->getMergeList().cancelPartMutations({}, to_kill->block_number); + getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number); to_kill->removeFile(); LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id); { @@ -817,9 +817,8 @@ bool StorageMergeTree::mergeSelectedParts( auto & future_part = merge_mutate_entry.future_part; Stopwatch stopwatch; MutableDataPartPtr new_part; - auto table_id = getStorageID(); - auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part); + auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part); auto write_part_log = [&] (const ExecutionStatus & execution_status) { @@ -964,9 +963,8 @@ std::shared_ptr StorageMergeTree::se bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder) { auto & future_part = merge_mutate_entry.future_part; - auto table_id = getStorageID(); - auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part); + auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part); Stopwatch stopwatch; MutableDataPartPtr new_part; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ea81f64659f..14dd9c20328 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1726,7 +1726,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) auto table_id = getStorageID(); /// Add merge to list - MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part); + MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_merged_part); Transaction transaction(*this); MutableDataPartPtr part; @@ -1871,9 +1871,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM future_mutated_part.updatePath(*this, reserved_space); future_mutated_part.type = source_part->getType(); - auto table_id = getStorageID(); - MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert( - table_id.database_name, table_id.table_name, future_mutated_part); + MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_mutated_part); Stopwatch stopwatch; @@ -5934,7 +5932,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio { const String & partition_id = pair.first; Int64 block_number = pair.second; - getContext()->getMergeList().cancelPartMutations(partition_id, block_number); + getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number); } return CancellationCode::CancelSent; } diff --git a/tests/queries/0_stateless/01900_kill_mutation_parallel.reference b/tests/queries/0_stateless/01900_kill_mutation_parallel.reference new file mode 100644 index 00000000000..605871f227d --- /dev/null +++ b/tests/queries/0_stateless/01900_kill_mutation_parallel.reference @@ -0,0 +1,2 @@ +1 bar_100 +2 bar_100 diff --git a/tests/queries/0_stateless/01900_kill_mutation_parallel.sh b/tests/queries/0_stateless/01900_kill_mutation_parallel.sh new file mode 100755 index 00000000000..730e4dbe776 --- /dev/null +++ b/tests/queries/0_stateless/01900_kill_mutation_parallel.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# +# Check that KILL MUTATION can be executed in parallel for different tables. +# For this two identical tables will be created: +# - on one table ALTER + KILL MUTATION will be executed +# - on another table only ALTER, that should be succeed +# + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + drop table if exists data_01900_1; + drop table if exists data_01900_2; + + create table data_01900_1 (k UInt64, s String) engine=MergeTree() order by k; + create table data_01900_2 (k UInt64, s String) engine=MergeTree() order by k; + + insert into data_01900_1 values (1, 'hello'), (2, 'world'); + insert into data_01900_2 values (1, 'hello'), (2, 'world'); +" + +# default finished_mutations_to_keep is 100 +# so 100 mutations will be scheduled and killed later. +for i in {1..100}; do + echo "alter table data_01900_1 update s = 'foo_$i' where 1;" +done | $CLICKHOUSE_CLIENT -nm + +# but these mutations should not be killed. +( + for i in {1..100}; do + echo "alter table data_01900_2 update s = 'bar_$i' where 1;" + done | $CLICKHOUSE_CLIENT -nm --mutations_sync=1 +) & +$CLICKHOUSE_CLIENT --format Null -nm -q "kill mutation where table = 'data_01900_1' and database = '$CLICKHOUSE_DATABASE';" +wait + +$CLICKHOUSE_CLIENT -nm -q "select * from data_01900_2"