From ef29b6178655aa33da8c2607f9357f7e05d77d9d Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Tue, 7 Feb 2023 10:02:38 +0100 Subject: [PATCH] Updated callback to return log entry ids - Updated checking of SYSTEM SYNC REPLICA --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 15 ++++++++------- src/Storages/MergeTree/ReplicatedMergeTreeQueue.h | 6 +++--- src/Storages/StorageReplicatedMergeTree.cpp | 11 +++++++++-- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index c1d949127b2..5ec95b61faa 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -545,7 +545,9 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}", entry->znode_name, entry->toString()); - notifySubscribers(queue_size, entry->log_entry_id); + /// Here we send empty log_entry_ids because the log_entries to wait for are already set + /// when a new subscriber is added + notifySubscribers(queue_size, {}, entry->log_entry_id); if (!need_remove_from_zk) return; @@ -2481,9 +2483,8 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall auto it = subscribers.emplace(subscribers.end(), std::move(callback)); - /// Notify if queue is empty - if (queue.empty()) - (*it)(0, std::nullopt); + /// Notify queue size & log entry ids to avoid waiting for removed entries + (*it)(queue.size(), getLogEntryIds(), std::nullopt); return SubscriberHandler(it, *this); } @@ -2494,16 +2495,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler() queue.subscribers.erase(it); } -void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional removed_log_entry_id) +void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::unordered_set log_entry_ids, std::optional removed_log_entry_id) { std::lock_guard lock_subscribers(subscribers_mutex); for (auto & subscriber_callback : subscribers) - subscriber_callback(new_queue_size, removed_log_entry_id); + subscriber_callback(new_queue_size, log_entry_ids, removed_log_entry_id); } ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue() { - notifySubscribers(0, std::nullopt); + notifySubscribers(0, {}, std::nullopt); } String padIndex(Int64 index) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 95a9f595203..c66c98d473f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -163,7 +163,7 @@ private: /// A subscriber callback is called when an entry queue is deleted mutable std::mutex subscribers_mutex; - using SubscriberCallBack = std::function /* removed_log_entry_id */)>; + using SubscriberCallBack = std::function /*wait_for_ids*/, std::optional /* removed_log_entry_id */)>; using Subscribers = std::list; using SubscriberIterator = Subscribers::iterator; @@ -180,8 +180,8 @@ private: Subscribers subscribers; - /// Notify subscribers about queue change (new queue size and entry that was removed) - void notifySubscribers(size_t new_queue_size, std::optional removed_log_entry_id); + /// Notify subscribers about queue change (new queue size , log entry ids in queue and entry that was removed) + void notifySubscribers(size_t new_queue_size, std::unordered_set log_entry_ids, std::optional removed_log_entry_id); /// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it bool checkReplaceRangeCanBeRemoved( diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ad444c06686..dc0cdc2bf31 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7559,13 +7559,20 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec /// And we force it to be executed. background_operations_assignee.trigger(); - std::unordered_set wait_for_ids = queue.getLogEntryIds(); + std::unordered_set wait_for_ids; + bool set_ids_to_wait = true; if (!wait_for_ids.empty()) { Poco::Event target_entry_event; - auto callback = [&target_entry_event, &wait_for_ids](size_t new_queue_size, std::optional removed_log_entry_id) + auto callback = [&target_entry_event, &wait_for_ids, &set_ids_to_wait](size_t new_queue_size, std::unordered_set log_entry_ids, std::optional removed_log_entry_id) { + if (set_ids_to_wait) + { + wait_for_ids = log_entry_ids; + set_ids_to_wait = false; + } + if (removed_log_entry_id.has_value()) wait_for_ids.erase(removed_log_entry_id.value());