mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #10849 from ClickHouse/fix_optimize_and_alter_hangs
Fix mutations and OPTIMIZE hangs when replica becomes inactive
This commit is contained in:
commit
623b2e5a43
@ -629,51 +629,54 @@ namespace
|
||||
{
|
||||
struct WaitForDisappearState
|
||||
{
|
||||
int32_t code = 0;
|
||||
int32_t event_type = 0;
|
||||
std::atomic_int32_t code = 0;
|
||||
std::atomic_int32_t event_type = 0;
|
||||
Poco::Event event;
|
||||
};
|
||||
using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
|
||||
}
|
||||
|
||||
void ZooKeeper::waitForDisappear(const std::string & path)
|
||||
bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & condition)
|
||||
{
|
||||
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
|
||||
|
||||
while (true)
|
||||
auto callback = [state](const Coordination::ExistsResponse & response)
|
||||
{
|
||||
auto callback = [state](const Coordination::ExistsResponse & response)
|
||||
state->code = response.error;
|
||||
if (state->code)
|
||||
state->event.set();
|
||||
};
|
||||
|
||||
auto watch = [state](const Coordination::WatchResponse & response)
|
||||
{
|
||||
if (!state->code)
|
||||
{
|
||||
state->code = response.error;
|
||||
if (state->code)
|
||||
state->event.set();
|
||||
};
|
||||
|
||||
auto watch = [state](const Coordination::WatchResponse & response)
|
||||
{
|
||||
if (!state->code)
|
||||
{
|
||||
state->code = response.error;
|
||||
if (!state->code)
|
||||
state->event_type = response.type;
|
||||
state->event.set();
|
||||
}
|
||||
};
|
||||
state->event_type = response.type;
|
||||
state->event.set();
|
||||
}
|
||||
};
|
||||
|
||||
while (!condition || !condition())
|
||||
{
|
||||
/// NOTE: if the node doesn't exist, the watch will leak.
|
||||
|
||||
impl->exists(path, callback, watch);
|
||||
state->event.wait();
|
||||
if (!condition)
|
||||
state->event.wait();
|
||||
else if (!state->event.tryWait(1000))
|
||||
continue;
|
||||
|
||||
if (state->code == Coordination::ZNONODE)
|
||||
return;
|
||||
return true;
|
||||
|
||||
if (state->code)
|
||||
throw KeeperException(state->code, path);
|
||||
|
||||
if (state->event_type == Coordination::DELETED)
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
ZooKeeperPtr ZooKeeper::startNewSession() const
|
||||
|
@ -185,8 +185,11 @@ public:
|
||||
/// Remove all children nodes (non recursive).
|
||||
void removeChildren(const std::string & path);
|
||||
|
||||
using WaitCondition = std::function<bool()>;
|
||||
/// Wait for the node to disappear or return immediately if it doesn't exist.
|
||||
void waitForDisappear(const std::string & path);
|
||||
/// If condition is speficied, it is used to return early (when condition returns false)
|
||||
/// The function returns true if waited and false if waiting was interrupted by condition.
|
||||
bool waitForDisappear(const std::string & path, const WaitCondition & condition = {});
|
||||
|
||||
/// Async interface (a small subset of operations is implemented).
|
||||
///
|
||||
|
@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
|
||||
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
|
||||
break; /// (numbers like 0000000000 and 0000000001)
|
||||
|
||||
/// We wait without timeout.
|
||||
wait_event->wait();
|
||||
/// Replica can become inactive, so wait with timeout and recheck it
|
||||
if (wait_event->tryWait(1000))
|
||||
break;
|
||||
}
|
||||
|
||||
if (partial_shutdown_called)
|
||||
@ -3841,7 +3842,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
|
||||
{
|
||||
if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
|
||||
{
|
||||
waitForReplicaToProcessLogEntry(replica, entry);
|
||||
if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active))
|
||||
unwaited.push_back(replica);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3854,7 +3856,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
|
||||
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
|
||||
{
|
||||
String entry_str = entry.toString();
|
||||
String log_node_name;
|
||||
@ -3875,6 +3877,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
* To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1.
|
||||
*/
|
||||
|
||||
const auto & check_replica_become_inactive = [this, &replica]()
|
||||
{
|
||||
return !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active");
|
||||
};
|
||||
constexpr auto event_wait_timeout_ms = 1000;
|
||||
|
||||
if (startsWith(entry.znode_name, "log-"))
|
||||
{
|
||||
/** In this case, just take the number from the node name `log-xxxxxxxxxx`.
|
||||
@ -3886,7 +3894,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
|
||||
|
||||
/// Let's wait until entry gets into the replica queue.
|
||||
while (true)
|
||||
while (wait_for_non_active || !check_replica_become_inactive())
|
||||
{
|
||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
||||
|
||||
@ -3894,7 +3902,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
|
||||
break;
|
||||
|
||||
event->wait();
|
||||
if (wait_for_non_active)
|
||||
event->wait();
|
||||
else
|
||||
event->tryWait(event_wait_timeout_ms);
|
||||
}
|
||||
}
|
||||
else if (startsWith(entry.znode_name, "queue-"))
|
||||
@ -3931,7 +3942,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
|
||||
|
||||
/// Let's wait until the entry gets into the replica queue.
|
||||
while (true)
|
||||
while (wait_for_non_active || !check_replica_become_inactive())
|
||||
{
|
||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
||||
|
||||
@ -3939,7 +3950,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
|
||||
break;
|
||||
|
||||
event->wait();
|
||||
if (wait_for_non_active)
|
||||
event->wait();
|
||||
else
|
||||
event->tryWait(event_wait_timeout_ms);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3974,13 +3988,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
|
||||
if (queue_entry_to_wait_for.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes.");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
|
||||
|
||||
/// Third - wait until the entry disappears from the replica queue.
|
||||
getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
|
||||
/// Third - wait until the entry disappears from the replica queue or replica become inactive.
|
||||
String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for;
|
||||
if (wait_for_non_active)
|
||||
return getZooKeeper()->waitForDisappear(path_to_wait_on);
|
||||
|
||||
return getZooKeeper()->waitForDisappear(path_to_wait_on, check_replica_become_inactive);
|
||||
}
|
||||
|
||||
|
||||
|
@ -486,7 +486,7 @@ private:
|
||||
/** Wait until the specified replica executes the specified action from the log.
|
||||
* NOTE: See comment about locks above.
|
||||
*/
|
||||
void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry);
|
||||
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
|
||||
|
||||
/// Choose leader replica, send requst to it and wait.
|
||||
void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context);
|
||||
|
@ -1,12 +0,0 @@
|
||||
1725
|
||||
1725
|
||||
1725
|
||||
1725
|
||||
1725
|
||||
Starting alters
|
||||
Finishing alters
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
Loading…
Reference in New Issue
Block a user