Merge pull request #22669 from ClickHouse/fix_two_bugs_with_mutation_wait

Fix two stupid bugs in mutation wait
This commit is contained in:
alesapin 2021-04-06 11:04:16 +03:00 committed by GitHub
commit 3cd571f92b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -455,12 +455,12 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
if (replicas.empty()) if (replicas.empty())
return; return;
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
std::set<String> inactive_replicas; std::set<String> inactive_replicas;
for (const String & replica : replicas) for (const String & replica : replicas)
{ {
LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id); LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id);
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
while (!partial_shutdown_called) while (!partial_shutdown_called)
{ {
@ -484,9 +484,8 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer"; String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer";
std::string mutation_pointer_value; std::string mutation_pointer_value;
Coordination::Stat get_stat;
/// Replica could be removed /// Replica could be removed
if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event)) if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, nullptr, wait_event))
{ {
LOG_WARNING(log, "Replica {} was removed", replica); LOG_WARNING(log, "Replica {} was removed", replica);
break; break;
@ -496,8 +495,10 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
/// Replica can become inactive, so wait with timeout and recheck it /// Replica can become inactive, so wait with timeout and recheck it
if (wait_event->tryWait(1000)) if (wait_event->tryWait(1000))
break; continue;
/// Here we check mutation for errors or kill on local replica. If they happen on this replica
/// they will happen on each replica, so we can check only in-memory info.
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id); auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id);
if (!mutation_status || !mutation_status->latest_fail_reason.empty()) if (!mutation_status || !mutation_status->latest_fail_reason.empty())
break; break;
@ -514,6 +515,8 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
std::set<String> mutation_ids; std::set<String> mutation_ids;
mutation_ids.insert(mutation_id); mutation_ids.insert(mutation_id);
/// Here we check mutation for errors or kill on local replica. If they happen on this replica
/// they will happen on each replica, so we can check only in-memory info.
auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids); auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids);
checkMutationStatus(mutation_status, mutation_ids); checkMutationStatus(mutation_status, mutation_ids);