Merge pull request #61554 from ClickHouse/fix_wait_for_mutation_done

Wait for done mutation with more logs and asserts
This commit is contained in:
alesapin 2024-03-25 14:27:34 +01:00 committed by GitHub
commit d4063e378e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -604,7 +604,12 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
break;
}
/// Here we check mutation for errors on local replica. If they happen on this replica
/// they will happen on each replica, so we can check only in-memory info.
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id);
String mutation_pointer = fs::path(zookeeper_path) / "replicas" / replica / "mutation_pointer";
std::string mutation_pointer_value;
/// Replica could be removed
if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, nullptr, wait_event))
@ -613,20 +618,33 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
break;
}
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
{
bool mutation_killed_or_done_locally = !mutation_status || mutation_status->is_done;
if (mutation_killed_or_done_locally)
{
LOG_TRACE(log, "Mutation {} is done because mutation pointer is {}", mutation_id, mutation_pointer_value);
break; /// (numbers like 0000000000 and 0000000001)
}
else
{
LOG_TRACE(log, "Mutation {} is done because mutation pointer is {}, but state is not updated in memory, will wait", mutation_id, mutation_pointer_value);
}
}
/// Replica can become inactive, so wait with timeout and recheck it
if (wait_event->tryWait(1000))
continue;
/// Here we check mutation for errors on local replica. If they happen on this replica
/// they will happen on each replica, so we can check only in-memory info.
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id);
/// If mutation status is empty, than local replica may just not loaded it into memory.
if (mutation_status && !mutation_status->latest_fail_reason.empty())
{
LOG_DEBUG(log, "Mutation {} is done {} or failed {} (status: '{}')", mutation_id, mutation_status->is_done, !mutation_status->latest_fail_reason.empty(), mutation_status->latest_fail_reason);
break;
}
/// Replica can become inactive, so wait with timeout, if nothing happened -> recheck it
if (!wait_event->tryWait(1000))
{
LOG_TRACE(log, "Failed to wait for mutation '{}', will recheck", mutation_id);
}
}
/// This replica inactive, don't check anything
if (!inactive_replicas.empty() && inactive_replicas.contains(replica))
break;
@ -655,6 +673,8 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
/// they will happen on each replica, so we can check only in-memory info.
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids);
checkMutationStatus(mutation_status, mutation_ids);
/// Mutation should be done or we should throw exception
chassert(mutation_status->is_done);
}
if (!inactive_replicas.empty())