mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
try executing other mutations immediately after KILL MUTATION [#CLICKHOUSE-3912]
This commit is contained in:
parent
1bf4174ec1
commit
8e437b191e
@ -560,6 +560,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
|||||||
|
|
||||||
/// Compare with the local state, delete obsolete entries and determine which new entries to load.
|
/// Compare with the local state, delete obsolete entries and determine which new entries to load.
|
||||||
Strings entries_to_load;
|
Strings entries_to_load;
|
||||||
|
bool some_active_mutations_were_killed = false;
|
||||||
{
|
{
|
||||||
std::lock_guard state_lock(state_mutex);
|
std::lock_guard state_lock(state_mutex);
|
||||||
|
|
||||||
@ -568,7 +569,14 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
|||||||
const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry;
|
const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry;
|
||||||
if (!entries_in_zk_set.count(entry.znode_name))
|
if (!entries_in_zk_set.count(entry.znode_name))
|
||||||
{
|
{
|
||||||
|
if (!it->second.is_done)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Removing killed mutation " + entry.znode_name + " from local state.");
|
||||||
|
some_active_mutations_were_killed = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state.");
|
LOG_DEBUG(log, "Removing obsolete mutation " + entry.znode_name + " from local state.");
|
||||||
|
|
||||||
for (const auto & partition_and_block_num : entry.block_numbers)
|
for (const auto & partition_and_block_num : entry.block_numbers)
|
||||||
{
|
{
|
||||||
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
|
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
|
||||||
@ -590,6 +598,9 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (some_active_mutations_were_killed)
|
||||||
|
storage.queue_task_handle->wake();
|
||||||
|
|
||||||
if (!entries_to_load.empty())
|
if (!entries_to_load.empty())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: "
|
LOG_INFO(log, "Loading " + toString(entries_to_load.size()) + " mutation entries: "
|
||||||
@ -660,13 +671,18 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
|
|||||||
if (rc == Coordination::ZOK)
|
if (rc == Coordination::ZOK)
|
||||||
LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper.");
|
LOG_DEBUG(log, "Removed mutation " + mutation_id + " from ZooKeeper.");
|
||||||
|
|
||||||
|
ReplicatedMergeTreeMutationEntryPtr entry;
|
||||||
|
bool mutation_was_active = false;
|
||||||
|
{
|
||||||
std::lock_guard state_lock(state_mutex);
|
std::lock_guard state_lock(state_mutex);
|
||||||
|
|
||||||
auto it = mutations_by_znode.find(mutation_id);
|
auto it = mutations_by_znode.find(mutation_id);
|
||||||
if (it == mutations_by_znode.end())
|
if (it == mutations_by_znode.end())
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
auto entry = it->second.entry;
|
mutation_was_active = !it->second.is_done;
|
||||||
|
|
||||||
|
entry = it->second.entry;
|
||||||
for (const auto & partition_and_block_num : entry->block_numbers)
|
for (const auto & partition_and_block_num : entry->block_numbers)
|
||||||
{
|
{
|
||||||
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
|
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
|
||||||
@ -677,6 +693,10 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
|
|||||||
|
|
||||||
mutations_by_znode.erase(it);
|
mutations_by_znode.erase(it);
|
||||||
LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state.");
|
LOG_DEBUG(log, "Removed mutation " + entry->znode_name + " from local state.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mutation_was_active)
|
||||||
|
storage.queue_task_handle->wake();
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
@ -410,6 +410,10 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
|||||||
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
|
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
|
||||||
to_kill->removeFile();
|
to_kill->removeFile();
|
||||||
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
|
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
|
||||||
|
|
||||||
|
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
||||||
|
background_task_handle->wake();
|
||||||
|
|
||||||
return CancellationCode::CancelSent;
|
return CancellationCode::CancelSent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user