Use WatchCallbackPtr

This commit is contained in:
Alexander Gololobov 2023-08-15 22:30:50 +02:00
parent 80b767316d
commit 54e2be4e32
4 changed files with 6 additions and 3 deletions

View File

@ -866,7 +866,7 @@ ActiveDataPartSet getPartNamesToMutate(
}
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback)
{
std::lock_guard lock(update_mutations_mutex);

View File

@ -335,7 +335,7 @@ public:
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback = {});
/// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
/// if it could not be found. Called during KILL MUTATION query execution.

View File

@ -338,6 +338,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
mutations_updating_task->deactivate();
mutations_watch_callback = std::make_shared<Coordination::WatchCallback>(mutations_updating_task->getWatchCallback());
merge_selecting_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
@ -3217,7 +3219,7 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
{
try
{
queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback());
queue.updateMutations(getZooKeeper(), mutations_watch_callback);
}
catch (const Coordination::Exception & e)
{

View File

@ -497,6 +497,7 @@ private:
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
Coordination::WatchCallbackPtr mutations_watch_callback;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;