KILL MUTATION for StorageReplicatedMergeTree [#CLICKHOUSE-3912]

This commit is contained in:
Alexey Zatelepin 2019-02-04 15:53:25 +03:00
parent 059c1b3589
commit e32f153328
5 changed files with 62 additions and 7 deletions

View File

@ -146,12 +146,13 @@ public:
return res;
}
void cancelMutation(Int64 mutation_version)
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : merges)
{
if (merge_element.source_data_version < mutation_version
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;
}

View File

@ -651,6 +651,37 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
zkutil::ZooKeeperPtr zookeeper, const String & mutation_id)
{
std::lock_guard lock(update_mutations_mutex);
auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id);
if (rc == Coordination::ZOK)
LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper.");
std::lock_guard state_lock(state_mutex);
auto it = mutations_by_znode.find(mutation_id);
if (it == mutations_by_znode.end())
return nullptr;
auto entry = it->second.entry;
for (const auto & partition_and_block_num : entry->block_numbers)
{
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
in_partition.erase(partition_and_block_num.second);
if (in_partition.empty())
mutations_by_partition.erase(partition_and_block_num.first);
}
mutations_by_znode.erase(it);
LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state.");
return entry;
}
ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name)
{
std::lock_guard lock(state_mutex);
@ -1167,8 +1198,8 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
{
LOG_ERROR(log, "There are no mutations for partition ID " << part->info.partition_id
<< " (trying to mutate part " << part->name << "to " << toString(desired_mutation_version) << ")");
LOG_WARNING(log, "There are no mutations for partition ID " << part->info.partition_id
<< " (trying to mutate part " << part->name << " to " << toString(desired_mutation_version) << ")");
return MutationCommands{};
}
@ -1176,7 +1207,7 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
auto end = in_partition->second.lower_bound(desired_mutation_version);
if (end == in_partition->second.end() || end->first != desired_mutation_version)
LOG_ERROR(log, "Mutation with version " << desired_mutation_version
LOG_WARNING(log, "Mutation with version " << desired_mutation_version
<< " not found in partition ID " << part->info.partition_id
<< " (trying to mutate part " << part->name + ")");
else

View File

@ -262,6 +262,10 @@ public:
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
/// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
/// if it could not be found.
ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id);
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.
*/

View File

@ -406,7 +406,7 @@ void StorageMergeTree::killMutation(const String & mutation_id)
if (to_kill)
{
global_context.getMergeList().cancelMutation(to_kill->block_number);
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
}

View File

@ -4403,7 +4403,26 @@ std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsSta
void StorageReplicatedMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "KILL MUTATION " << mutation_id);
assertNotReadonly();
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
LOG_TRACE(log, "Killing mutation " << mutation_id);
auto mutation_entry = queue.removeMutation(zookeeper, mutation_id);
if (!mutation_entry)
return;
/// After this point no new part mutations will start and part mutations that still exist
/// in the queue will be skipped.
/// Cancel already running part mutations.
for (const auto & pair : mutation_entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_number = pair.second;
global_context.getMergeList().cancelPartMutations(partition_id, block_number);
}
}