From e32f153328bd5fa76aaa50f5e9a8233fcf974d5b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 4 Feb 2019 15:53:25 +0300 Subject: [PATCH] KILL MUTATION for StorageReplicatedMergeTree [#CLICKHOUSE-3912] --- dbms/src/Storages/MergeTree/MergeList.h | 5 ++- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 37 +++++++++++++++++-- .../MergeTree/ReplicatedMergeTreeQueue.h | 4 ++ dbms/src/Storages/StorageMergeTree.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 21 ++++++++++- 5 files changed, 62 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index bb41998b391..2d800c85896 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -146,12 +146,13 @@ public: return res; } - void cancelMutation(Int64 mutation_version) + void cancelPartMutations(const String & partition_id, Int64 mutation_version) { std::lock_guard lock{mutex}; for (auto & merge_element : merges) { - if (merge_element.source_data_version < mutation_version + if ((partition_id.empty() || merge_element.partition_id == partition_id) + && merge_element.source_data_version < mutation_version && merge_element.result_data_version >= mutation_version) merge_element.is_cancelled = true; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1ca0564cb7f..4509651d852 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -651,6 +651,37 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C } +ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( + zkutil::ZooKeeperPtr zookeeper, const String & mutation_id) +{ + std::lock_guard lock(update_mutations_mutex); + + auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id); + if (rc == Coordination::ZOK) + LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper."); + + std::lock_guard state_lock(state_mutex); + + auto it = mutations_by_znode.find(mutation_id); + if (it == mutations_by_znode.end()) + return nullptr; + + auto entry = it->second.entry; + for (const auto & partition_and_block_num : entry->block_numbers) + { + auto & in_partition = mutations_by_partition[partition_and_block_num.first]; + in_partition.erase(partition_and_block_num.second); + if (in_partition.empty()) + mutations_by_partition.erase(partition_and_block_num.first); + } + + mutations_by_znode.erase(it); + LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state."); + + return entry; +} + + ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name) { std::lock_guard lock(state_mutex); @@ -1167,8 +1198,8 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( auto in_partition = mutations_by_partition.find(part->info.partition_id); if (in_partition == mutations_by_partition.end()) { - LOG_ERROR(log, "There are no mutations for partition ID " << part->info.partition_id - << " (trying to mutate part " << part->name << "to " << toString(desired_mutation_version) << ")"); + LOG_WARNING(log, "There are no mutations for partition ID " << part->info.partition_id + << " (trying to mutate part " << part->name << " to " << toString(desired_mutation_version) << ")"); return MutationCommands{}; } @@ -1176,7 +1207,7 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( auto end = in_partition->second.lower_bound(desired_mutation_version); if (end == in_partition->second.end() || end->first != desired_mutation_version) - LOG_ERROR(log, "Mutation with version " << desired_mutation_version + LOG_WARNING(log, "Mutation with version " << desired_mutation_version << " not found in partition ID " << part->info.partition_id << " (trying to mutate part " << part->name + ")"); else diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 4acf23a0562..0d439d7b610 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -262,6 +262,10 @@ public: /// If watch_callback is not empty, will call it when new mutations appear in ZK. void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); + /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr + /// if it could not be found. + ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id); + /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). * And also wait for the completion of their execution, if they are now being executed. */ diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d158a23425e..68a7bd71d0a 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -406,7 +406,7 @@ void StorageMergeTree::killMutation(const String & mutation_id) if (to_kill) { - global_context.getMergeList().cancelMutation(to_kill->block_number); + global_context.getMergeList().cancelPartMutations({}, to_kill->block_number); to_kill->removeFile(); LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 98e9c827f03..8c8e01ccfc9 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4403,7 +4403,26 @@ std::vector StorageReplicatedMergeTree::getMutationsSta void StorageReplicatedMergeTree::killMutation(const String & mutation_id) { - LOG_TRACE(log, "KILL MUTATION " << mutation_id); + assertNotReadonly(); + + zkutil::ZooKeeperPtr zookeeper = getZooKeeper(); + + LOG_TRACE(log, "Killing mutation " << mutation_id); + + auto mutation_entry = queue.removeMutation(zookeeper, mutation_id); + if (!mutation_entry) + return; + + /// After this point no new part mutations will start and part mutations that still exist + /// in the queue will be skipped. + + /// Cancel already running part mutations. + for (const auto & pair : mutation_entry->block_numbers) + { + const String & partition_id = pair.first; + Int64 block_number = pair.second; + global_context.getMergeList().cancelPartMutations(partition_id, block_number); + } }