From ab41384f63a8b6921d3c5d976e68ddc60b769fe5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 13 Sep 2021 11:00:07 +0300 Subject: [PATCH] Move queue initialization to restarting thread --- src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 5 +++++ .../MergeTree/ReplicatedMergeTreeRestartingThread.cpp | 2 ++ src/Storages/StorageReplicatedMergeTree.cpp | 2 -- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index b5ab13933d3..f6419ca6ef8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -59,6 +59,11 @@ void ReplicatedMergeTreeQueue::initialize(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard lock(state_mutex); + /// Clear parts before setting new one + current_parts.clear(); + virtual_parts.clear(); + + /// Get current parts state from zookeeper Strings parts = zookeeper->getChildren(replica_path + "/parts"); for (const auto & part_name : parts) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index f9ea8c623f7..16597fc4be9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -174,6 +174,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() try { + storage.queue.initialize(zookeeper); + storage.queue.load(zookeeper); /// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 2b93e01c394..de5a19835c0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4285,8 +4285,6 @@ void StorageReplicatedMergeTree::startup() try { - queue.initialize(getZooKeeper()); - InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared(*this); [[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr); assert(prev_ptr == nullptr);