Updated callback to return log entry ids - Updated checking of SYSTEM SYNC REPLICA

This commit is contained in:
Smita Kulkarni 2023-02-07 10:02:38 +01:00
parent f34ef86332
commit ef29b61786
3 changed files with 20 additions and 12 deletions

View File

@ -545,7 +545,9 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size, entry->log_entry_id);
/// Here we send empty log_entry_ids because the log_entries to wait for are already set
/// when a new subscriber is added
notifySubscribers(queue_size, {}, entry->log_entry_id);
if (!need_remove_from_zk)
return;
@ -2481,9 +2483,8 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
auto it = subscribers.emplace(subscribers.end(), std::move(callback));
/// Notify if queue is empty
if (queue.empty())
(*it)(0, std::nullopt);
/// Notify queue size & log entry ids to avoid waiting for removed entries
(*it)(queue.size(), getLogEntryIds(), std::nullopt);
return SubscriberHandler(it, *this);
}
@ -2494,16 +2495,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
queue.subscribers.erase(it);
}
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id)
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id)
{
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size, removed_log_entry_id);
subscriber_callback(new_queue_size, log_entry_ids, removed_log_entry_id);
}
ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0, std::nullopt);
notifySubscribers(0, {}, std::nullopt);
}
String padIndex(Int64 index)

View File

@ -163,7 +163,7 @@ private:
/// A subscriber callback is called when an entry queue is deleted
mutable std::mutex subscribers_mutex;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::optional<String> /* removed_log_entry_id */)>;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::unordered_set<String> /*wait_for_ids*/, std::optional<String> /* removed_log_entry_id */)>;
using Subscribers = std::list<SubscriberCallBack>;
using SubscriberIterator = Subscribers::iterator;
@ -180,8 +180,8 @@ private:
Subscribers subscribers;
/// Notify subscribers about queue change (new queue size and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id);
/// Notify subscribers about queue change (new queue size , log entry ids in queue and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(

View File

@ -7559,13 +7559,20 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec
/// And we force it to be executed.
background_operations_assignee.trigger();
std::unordered_set<String> wait_for_ids = queue.getLogEntryIds();
std::unordered_set<String> wait_for_ids;
bool set_ids_to_wait = true;
if (!wait_for_ids.empty())
{
Poco::Event target_entry_event;
auto callback = [&target_entry_event, &wait_for_ids](size_t new_queue_size, std::optional<String> removed_log_entry_id)
auto callback = [&target_entry_event, &wait_for_ids, &set_ids_to_wait](size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id)
{
if (set_ids_to_wait)
{
wait_for_ids = log_entry_ids;
set_ids_to_wait = false;
}
if (removed_log_entry_id.has_value())
wait_for_ids.erase(removed_log_entry_id.value());