From 54e2be4e327d1bbbc68d250a1cf2ae9065d2bc50 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov <440544+davenger@users.noreply.github.com> Date: Tue, 15 Aug 2023 22:30:50 +0200 Subject: [PATCH] Use WatchCallbackPtr --- src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 2 +- src/Storages/MergeTree/ReplicatedMergeTreeQueue.h | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 4 +++- src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 84307a3ca7a..334d47288d5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -866,7 +866,7 @@ ActiveDataPartSet getPartNamesToMutate( } -void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) +void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback) { std::lock_guard lock(update_mutations_mutex); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 611866877d8..d5d85e58cb5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -335,7 +335,7 @@ public: /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task. /// If watch_callback is not empty, will call it when new mutations appear in ZK. - void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); + void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback = {}); /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr /// if it could not be found. Called during KILL MUTATION query execution. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 62db6d2d7b7..316723d4013 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -338,6 +338,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( mutations_updating_task->deactivate(); + mutations_watch_callback = std::make_shared(mutations_updating_task->getWatchCallback()); + merge_selecting_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); }); @@ -3217,7 +3219,7 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask() { try { - queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback()); + queue.updateMutations(getZooKeeper(), mutations_watch_callback); } catch (const Coordination::Exception & e) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 78ef39f032f..8e9eed678c8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -497,6 +497,7 @@ private: BackgroundSchedulePool::TaskHolder queue_updating_task; BackgroundSchedulePool::TaskHolder mutations_updating_task; + Coordination::WatchCallbackPtr mutations_watch_callback; /// A task that selects parts to merge. BackgroundSchedulePool::TaskHolder merge_selecting_task;