Merge pull request #17800 from nvartolomei/nv/waitForAllReplicasToProcessLogEntry-foreign-shard

Update StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry to support waiting on foreign shards / tables
This commit is contained in:
alesapin 2020-12-05 16:15:46 +03:00 committed by GitHub
commit 27c3301083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 16 deletions

View File

@ -4572,19 +4572,19 @@ StorageReplicatedMergeTree::allocateBlockNumber(
} }
Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry(
const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{ {
LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name);
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = zookeeper->getChildren(table_zookeeper_path + "/replicas");
Strings unwaited; Strings unwaited;
for (const String & replica : replicas) for (const String & replica : replicas)
{ {
if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) if (wait_for_non_active || zookeeper->exists(table_zookeeper_path + "/replicas/" + replica + "/is_active"))
{ {
if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active)) if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_non_active))
unwaited.push_back(replica); unwaited.push_back(replica);
} }
else else
@ -4598,8 +4598,15 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(
} }
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry( Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(
const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{
return waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_non_active);
}
bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{ {
String entry_str = entry.toString(); String entry_str = entry.toString();
String log_node_name; String log_node_name;
@ -4625,7 +4632,7 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
const auto & stop_waiting = [&]() const auto & stop_waiting = [&]()
{ {
bool stop_waiting_itself = waiting_itself && is_dropped; bool stop_waiting_itself = waiting_itself && is_dropped;
bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"); bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(table_zookeeper_path + "/replicas/" + replica + "/is_active");
return stop_waiting_itself || stop_waiting_non_active; return stop_waiting_itself || stop_waiting_non_active;
}; };
constexpr auto event_wait_timeout_ms = 1000; constexpr auto event_wait_timeout_ms = 1000;
@ -4645,7 +4652,7 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); String log_pointer = getZooKeeper()->get(table_zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index) if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
break; break;
@ -4661,9 +4668,9 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
* looking for a node with the same content. And if we do not find it - then the replica has already taken this entry in its queue. * looking for a node with the same content. And if we do not find it - then the replica has already taken this entry in its queue.
*/ */
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer"); String log_pointer = getZooKeeper()->get(table_zookeeper_path + "/replicas/" + replica + "/log_pointer");
Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log"); Strings log_entries = getZooKeeper()->getChildren(table_zookeeper_path + "/log");
UInt64 log_index = 0; UInt64 log_index = 0;
bool found = false; bool found = false;
@ -4675,7 +4682,7 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
continue; continue;
String log_entry_str; String log_entry_str;
bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str); bool exists = getZooKeeper()->tryGet(table_zookeeper_path + "/log/" + log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str) if (exists && entry_str == log_entry_str)
{ {
found = true; found = true;
@ -4693,7 +4700,7 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
{ {
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer_new = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event); String log_pointer_new = getZooKeeper()->get(table_zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index) if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
break; break;
@ -4717,13 +4724,13 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
* Therefore, we search by comparing the content. * Therefore, we search by comparing the content.
*/ */
Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue"); Strings queue_entries = getZooKeeper()->getChildren(table_zookeeper_path + "/replicas/" + replica + "/queue");
String queue_entry_to_wait_for; String queue_entry_to_wait_for;
for (const String & entry_name : queue_entries) for (const String & entry_name : queue_entries)
{ {
String queue_entry_str; String queue_entry_str;
bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str); bool exists = getZooKeeper()->tryGet(table_zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
if (exists && queue_entry_str == entry_str) if (exists && queue_entry_str == entry_str)
{ {
queue_entry_to_wait_for = entry_name; queue_entry_to_wait_for = entry_name;
@ -4741,12 +4748,19 @@ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
LOG_DEBUG(log, "Waiting for {} to disappear from {} queue", queue_entry_to_wait_for, replica); LOG_DEBUG(log, "Waiting for {} to disappear from {} queue", queue_entry_to_wait_for, replica);
/// Third - wait until the entry disappears from the replica queue or replica become inactive. /// Third - wait until the entry disappears from the replica queue or replica become inactive.
String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for; String path_to_wait_on = table_zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for;
return getZooKeeper()->waitForDisappear(path_to_wait_on, stop_waiting); return getZooKeeper()->waitForDisappear(path_to_wait_on, stop_waiting);
} }
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(
const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{
return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_non_active);
}
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{ {
auto zookeeper = tryGetZooKeeper(); auto zookeeper = tryGetZooKeeper();

View File

@ -537,12 +537,16 @@ private:
* NOTE: This method must be called without table lock held. * NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet. * TODO: There are wrong usages of this method that are not fixed yet.
*
* One method for convenient use on current table, another for waiting on foregin shards.
*/ */
Strings waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/** Wait until the specified replica executes the specified action from the log. /** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above. * NOTE: See comment about locks above.
*/ */
bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/// Throw an exception if the table is readonly. /// Throw an exception if the table is readonly.