diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 7a5b82979bd..ac952530dae 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -209,6 +209,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.queue_updating_task->activateAndSchedule(); storage.mutations_updating_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule(); + storage.merge_selecting_task->activateAndSchedule(); storage.cleanup_thread.start(); storage.part_check_thread.start(); @@ -375,6 +376,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() LOG_TRACE(log, "Waiting for threads to finish"); + storage.merge_selecting_task->deactivate(); storage.queue_updating_task->deactivate(); storage.mutations_updating_task->deactivate(); storage.mutations_finalizing_task->deactivate(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7743736724f..5760e38a0b3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3365,7 +3365,6 @@ void StorageReplicatedMergeTree::startBeingLeader() LOG_INFO(log, "Became leader"); is_leader = true; - merge_selecting_task->activateAndSchedule(); } void StorageReplicatedMergeTree::stopBeingLeader() @@ -3375,7 +3374,6 @@ void StorageReplicatedMergeTree::stopBeingLeader() LOG_INFO(log, "Stopped being leader"); is_leader = false; - merge_selecting_task->deactivate(); } ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context) @@ -4038,15 +4036,14 @@ void StorageReplicatedMergeTree::startup() assert(prev_ptr == nullptr); getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr); + startBeingLeader(); + /// In this thread replica will be activated. restarting_thread.start(); /// Wait while restarting_thread finishing initialization startup_event.wait(); - /// Restarting thread has initialized replication queue, replica can become leader now - startBeingLeader(); - startBackgroundMovesIfNeeded(); part_moves_between_shards_orchestrator.start();