From c830397317fabbea56bc329b3346f3bcdc3f986f Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 5 Apr 2021 18:08:43 +0300 Subject: [PATCH] Fix two stupid bugs in mutation wait --- src/Storages/StorageReplicatedMergeTree.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f9d63132a1b..1cc7c7299fa 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -455,12 +455,12 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( if (replicas.empty()) return; - zkutil::EventPtr wait_event = std::make_shared(); std::set inactive_replicas; for (const String & replica : replicas) { LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id); + zkutil::EventPtr wait_event = std::make_shared(); while (!partial_shutdown_called) { @@ -484,9 +484,8 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer"; std::string mutation_pointer_value; - Coordination::Stat get_stat; /// Replica could be removed - if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event)) + if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, nullptr, wait_event)) { LOG_WARNING(log, "Replica {} was removed", replica); break; @@ -496,8 +495,10 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( /// Replica can become inactive, so wait with timeout and recheck it if (wait_event->tryWait(1000)) - break; + continue; + /// Here we check mutation for errors or kill on local replica. If they happen on this replica + /// they will happen on each replica, so we can check only in-memory info. auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id); if (!mutation_status || !mutation_status->latest_fail_reason.empty()) break; @@ -514,6 +515,8 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( std::set mutation_ids; mutation_ids.insert(mutation_id); + /// Here we check mutation for errors or kill on local replica. If they happen on this replica + /// they will happen on each replica, so we can check only in-memory info. auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids); checkMutationStatus(mutation_status, mutation_ids);