fix alter and optimize hangs when replica becomes inactive

This commit is contained in:
Alexander Tokmakov 2020-05-12 17:11:09 +03:00
parent ff0ae624ef
commit 032197b015
5 changed files with 58 additions and 47 deletions

View File

@ -629,19 +629,17 @@ namespace
{ {
struct WaitForDisappearState struct WaitForDisappearState
{ {
int32_t code = 0; std::atomic_int32_t code = 0;
int32_t event_type = 0; std::atomic_int32_t event_type = 0;
Poco::Event event; Poco::Event event;
}; };
using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>; using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
} }
void ZooKeeper::waitForDisappear(const std::string & path) bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & condition)
{ {
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>(); WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
while (true)
{
auto callback = [state](const Coordination::ExistsResponse & response) auto callback = [state](const Coordination::ExistsResponse & response)
{ {
state->code = response.error; state->code = response.error;
@ -660,20 +658,26 @@ void ZooKeeper::waitForDisappear(const std::string & path)
} }
}; };
while (!condition || !condition())
{
/// NOTE: if the node doesn't exist, the watch will leak. /// NOTE: if the node doesn't exist, the watch will leak.
impl->exists(path, callback, watch); impl->exists(path, callback, watch);
if (!condition)
state->event.wait(); state->event.wait();
else if (!state->event.tryWait(1000))
continue;
if (state->code == Coordination::ZNONODE) if (state->code == Coordination::ZNONODE)
return; return true;
if (state->code) if (state->code)
throw KeeperException(state->code, path); throw KeeperException(state->code, path);
if (state->event_type == Coordination::DELETED) if (state->event_type == Coordination::DELETED)
return; return true;
} }
return false;
} }
ZooKeeperPtr ZooKeeper::startNewSession() const ZooKeeperPtr ZooKeeper::startNewSession() const

View File

@ -185,8 +185,9 @@ public:
/// Remove all children nodes (non recursive). /// Remove all children nodes (non recursive).
void removeChildren(const std::string & path); void removeChildren(const std::string & path);
using WaitCondition = std::function<bool()>;
/// Wait for the node to disappear or return immediately if it doesn't exist. /// Wait for the node to disappear or return immediately if it doesn't exist.
void waitForDisappear(const std::string & path); bool waitForDisappear(const std::string & path, const WaitCondition & condition = {});
/// Async interface (a small subset of operations is implemented). /// Async interface (a small subset of operations is implemented).
/// ///

View File

@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
break; /// (numbers like 0000000000 and 0000000001) break; /// (numbers like 0000000000 and 0000000001)
/// We wait without timeout. /// Replica can become inactive, so wait with timeout and recheck it
wait_event->wait(); if (wait_event->tryWait(1000))
break;
} }
if (partial_shutdown_called) if (partial_shutdown_called)
@ -3837,7 +3838,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
{ {
if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{ {
waitForReplicaToProcessLogEntry(replica, entry); if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active))
unwaited.push_back(replica);
} }
else else
{ {
@ -3850,7 +3852,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
} }
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{ {
String entry_str = entry.toString(); String entry_str = entry.toString();
String log_node_name; String log_node_name;
@ -3871,6 +3873,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
* To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1. * To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1.
*/ */
const auto & check_replica_become_inactive = [this, &replica]()
{
return !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active");
};
constexpr auto event_wait_timeout_ms = 1000;
if (startsWith(entry.znode_name, "log-")) if (startsWith(entry.znode_name, "log-"))
{ {
/** In this case, just take the number from the node name `log-xxxxxxxxxx`. /** In this case, just take the number from the node name `log-xxxxxxxxxx`.
@ -3882,7 +3890,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
/// Let's wait until entry gets into the replica queue. /// Let's wait until entry gets into the replica queue.
while (true) while (wait_for_non_active || !check_replica_become_inactive())
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
@ -3890,7 +3898,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index) if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break; break;
if (wait_for_non_active)
event->wait(); event->wait();
else
event->tryWait(event_wait_timeout_ms);
} }
} }
else if (startsWith(entry.znode_name, "queue-")) else if (startsWith(entry.znode_name, "queue-"))
@ -3927,7 +3938,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
/// Let's wait until the entry gets into the replica queue. /// Let's wait until the entry gets into the replica queue.
while (true) while (wait_for_non_active || !check_replica_become_inactive())
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
@ -3935,7 +3946,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index) if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
break; break;
if (wait_for_non_active)
event->wait(); event->wait();
else
event->tryWait(event_wait_timeout_ms);
} }
} }
} }
@ -3970,13 +3984,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
if (queue_entry_to_wait_for.empty()) if (queue_entry_to_wait_for.empty())
{ {
LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes."); LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes.");
return; return true;
} }
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
/// Third - wait until the entry disappears from the replica queue. /// Third - wait until the entry disappears from the replica queue or replica become inactive.
getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for;
if (wait_for_non_active)
return getZooKeeper()->waitForDisappear(path_to_wait_on);
return getZooKeeper()->waitForDisappear(path_to_wait_on, check_replica_become_inactive);
} }

View File

@ -486,7 +486,7 @@ private:
/** Wait until the specified replica executes the specified action from the log. /** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above. * NOTE: See comment about locks above.
*/ */
void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/// Choose leader replica, send requst to it and wait. /// Choose leader replica, send requst to it and wait.
void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context);

View File

@ -1,12 +0,0 @@
1725
1725
1725
1725
1725
Starting alters
Finishing alters
1
1
1
1
1