From 032197b015d1b306ed535fbdd9ba5ea1ea527d10 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 12 May 2020 17:11:09 +0300 Subject: [PATCH 1/4] fix alter and optimize hangs when replica becomes inactive --- src/Common/ZooKeeper/ZooKeeper.cpp | 48 ++++++++++--------- src/Common/ZooKeeper/ZooKeeper.h | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 40 +++++++++++----- src/Storages/StorageReplicatedMergeTree.h | 2 +- ...9_parallel_alter_modify_zookeeper.referece | 12 ----- 5 files changed, 58 insertions(+), 47 deletions(-) delete mode 100644 tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 032d1e90ff5..3bd102cb10d 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -629,51 +629,55 @@ namespace { struct WaitForDisappearState { - int32_t code = 0; - int32_t event_type = 0; + std::atomic_int32_t code = 0; + std::atomic_int32_t event_type = 0; Poco::Event event; }; using WaitForDisappearStatePtr = std::shared_ptr; } -void ZooKeeper::waitForDisappear(const std::string & path) +bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & condition) { WaitForDisappearStatePtr state = std::make_shared(); - while (true) + auto callback = [state](const Coordination::ExistsResponse & response) { - auto callback = [state](const Coordination::ExistsResponse & response) + state->code = response.error; + if (state->code) + state->event.set(); + }; + + auto watch = [state](const Coordination::WatchResponse & response) + { + if (!state->code) { state->code = response.error; - if (state->code) - state->event.set(); - }; - - auto watch = [state](const Coordination::WatchResponse & response) - { if (!state->code) - { - state->code = response.error; - if (!state->code) - state->event_type = response.type; - state->event.set(); - } - }; + state->event_type = response.type; + state->event.set(); + } + }; + while (!condition || !condition()) + { /// NOTE: if the node doesn't exist, the watch will leak. - impl->exists(path, callback, watch); - state->event.wait(); + if (!condition) + state->event.wait(); + else if (!state->event.tryWait(1000)) + continue; if (state->code == Coordination::ZNONODE) - return; + return true; if (state->code) throw KeeperException(state->code, path); if (state->event_type == Coordination::DELETED) - return; + return true; } + + return false; } ZooKeeperPtr ZooKeeper::startNewSession() const diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index db166314a07..74209ce2f4f 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -185,8 +185,9 @@ public: /// Remove all children nodes (non recursive). void removeChildren(const std::string & path); + using WaitCondition = std::function; /// Wait for the node to disappear or return immediately if it doesn't exist. - void waitForDisappear(const std::string & path); + bool waitForDisappear(const std::string & path, const WaitCondition & condition = {}); /// Async interface (a small subset of operations is implemented). /// diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 2bd0a111f0d..76f8a560a18 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation break; /// (numbers like 0000000000 and 0000000001) - /// We wait without timeout. - wait_event->wait(); + /// Replica can become inactive, so wait with timeout and recheck it + if (wait_event->tryWait(1000)) + break; } if (partial_shutdown_called) @@ -3837,7 +3838,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re { if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) { - waitForReplicaToProcessLogEntry(replica, entry); + if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active)) + unwaited.push_back(replica); } else { @@ -3850,7 +3852,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re } -void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) +bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) { String entry_str = entry.toString(); String log_node_name; @@ -3871,6 +3873,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & * To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1. */ + const auto & check_replica_become_inactive = [this, &replica]() + { + return !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"); + }; + constexpr auto event_wait_timeout_ms = 1000; + if (startsWith(entry.znode_name, "log-")) { /** In this case, just take the number from the node name `log-xxxxxxxxxx`. @@ -3882,7 +3890,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); /// Let's wait until entry gets into the replica queue. - while (true) + while (wait_for_non_active || !check_replica_become_inactive()) { zkutil::EventPtr event = std::make_shared(); @@ -3890,7 +3898,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (!log_pointer.empty() && parse(log_pointer) > log_index) break; - event->wait(); + if (wait_for_non_active) + event->wait(); + else + event->tryWait(event_wait_timeout_ms); } } else if (startsWith(entry.znode_name, "queue-")) @@ -3927,7 +3938,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); /// Let's wait until the entry gets into the replica queue. - while (true) + while (wait_for_non_active || !check_replica_become_inactive()) { zkutil::EventPtr event = std::make_shared(); @@ -3935,7 +3946,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (!log_pointer_new.empty() && parse(log_pointer_new) > log_index) break; - event->wait(); + if (wait_for_non_active) + event->wait(); + else + event->tryWait(event_wait_timeout_ms); } } } @@ -3970,13 +3984,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (queue_entry_to_wait_for.empty()) { LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes."); - return; + return true; } LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); - /// Third - wait until the entry disappears from the replica queue. - getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); + /// Third - wait until the entry disappears from the replica queue or replica become inactive. + String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for; + if (wait_for_non_active) + return getZooKeeper()->waitForDisappear(path_to_wait_on); + + return getZooKeeper()->waitForDisappear(path_to_wait_on, check_replica_become_inactive); } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 70fb48e9b35..f01e51bd769 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -486,7 +486,7 @@ private: /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); + bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece deleted file mode 100644 index be716831212..00000000000 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece +++ /dev/null @@ -1,12 +0,0 @@ -1725 -1725 -1725 -1725 -1725 -Starting alters -Finishing alters -1 -1 -1 -1 -1 From cd8659a6d4d42476bd11dcee106ce8261ea47e86 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 13 May 2020 13:51:02 +0300 Subject: [PATCH 2/4] trigger CI From 39518fe725da8a571fc48b4058f327c688308241 Mon Sep 17 00:00:00 2001 From: tavplubix Date: Wed, 13 May 2020 16:45:39 +0300 Subject: [PATCH 3/4] trigger CI --- src/Common/ZooKeeper/ZooKeeper.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 3bd102cb10d..2e94cfe9992 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -676,7 +676,6 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & if (state->event_type == Coordination::DELETED) return true; } - return false; } From 63c6eb1a37b4f33f0759fe48f67bee9a84aa3e86 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 13 May 2020 23:40:54 +0300 Subject: [PATCH 4/4] Update ZooKeeper.h --- src/Common/ZooKeeper/ZooKeeper.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 74209ce2f4f..e8ab06c2182 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -187,6 +187,8 @@ public: using WaitCondition = std::function; /// Wait for the node to disappear or return immediately if it doesn't exist. + /// If condition is speficied, it is used to return early (when condition returns false) + /// The function returns true if waited and false if waiting was interrupted by condition. bool waitForDisappear(const std::string & path, const WaitCondition & condition = {}); /// Async interface (a small subset of operations is implemented).