diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 72ff7084bc6..2450be70a40 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -20,7 +20,6 @@ ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicated , log(&Logger::get(log_name)) { task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); }); - task->schedule(); } void ReplicatedMergeTreeAlterThread::run() diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h index 257ef0a7659..c713f42ae29 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h @@ -23,6 +23,14 @@ class ReplicatedMergeTreeAlterThread public: ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_); + void start() + { + task->activate(); + task->schedule(); + } + + void stop() { task->deactivate(); } + private: void run(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index df8de692488..0d1d7d9a51f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -21,7 +21,6 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic , log(&Logger::get(log_name)) { task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); - task->schedule(); } void ReplicatedMergeTreeCleanupThread::run() diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 7d45a158c4c..cdf87357e3b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -24,7 +24,15 @@ class ReplicatedMergeTreeCleanupThread public: ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); - void schedule() { task->schedule(); } + void start() + { + task->activate(); + task->schedule(); + } + + void wakeup() { task->schedule(); } + + void stop() { task->deactivate(); } private: StorageReplicatedMergeTree & storage; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 88895a473ae..c0d2b902e29 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -57,7 +57,29 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() { - completeShutdown(); + try + { + /// Stop restarting_thread before stopping other tasks - so that it won't restart them again. + need_stop = true; + task->deactivate(); + LOG_TRACE(log, "Restarting thread finished"); + + /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. + storage.fetcher.blocker.cancelForever(); + storage.merger_mutator.actions_blocker.cancelForever(); + + /// Stop other tasks. + + partialShutdown(); + + if (storage.queue_task_handle) + storage.context.getBackgroundPool().removeTask(storage.queue_task_handle); + storage.queue_task_handle.reset(); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } } void ReplicatedMergeTreeRestartingThread::run() @@ -167,29 +189,6 @@ void ReplicatedMergeTreeRestartingThread::run() task->scheduleAfter(check_period_ms); } -void ReplicatedMergeTreeRestartingThread::completeShutdown() -{ - try - { - storage.data_parts_exchange_endpoint_holder->getBlocker().cancelForever(); - storage.data_parts_exchange_endpoint_holder = nullptr; - - /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. - storage.fetcher.blocker.cancelForever(); - storage.merger_mutator.actions_blocker.cancelForever(); - - partialShutdown(); - - if (storage.queue_task_handle) - storage.context.getBackgroundPool().removeTask(storage.queue_task_handle); - storage.queue_task_handle.reset(); - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } -} - bool ReplicatedMergeTreeRestartingThread::tryStartup() { @@ -212,9 +211,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.queue_updating_task->schedule(); storage.mutations_updating_task->activate(); storage.mutations_updating_task->schedule(); + storage.cleanup_thread.start(); + storage.alter_thread.start(); storage.part_check_thread.start(); - storage.alter_thread = std::make_unique(storage); - storage.cleanup_thread = std::make_unique(storage); if (!storage.queue_task_handle) storage.queue_task_handle = storage.context.getBackgroundPool().addTask( @@ -362,8 +361,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.queue_updating_task->deactivate(); storage.mutations_updating_task->deactivate(); - storage.cleanup_thread.reset(); - storage.alter_thread.reset(); + storage.cleanup_thread.stop(); + storage.alter_thread.stop(); storage.part_check_thread.stop(); LOG_TRACE(log, "Threads finished"); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 7747f008ddf..997dfd77c6f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -36,12 +36,6 @@ public: return wakeup_event; } - void stop() - { - need_stop = true; - wakeup_event.set(); - } - private: StorageReplicatedMergeTree & storage; String log_name; @@ -59,7 +53,6 @@ private: bool startup_completed = false; void run(); - void completeShutdown(); /// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper. bool tryStartup(); /// Returns false if ZooKeeper is not available. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 09fd8bbba8a..b78176c8c9d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -215,7 +215,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( [this] (const std::string & name) { enqueuePartForCheck(name); }), reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), queue(*this), fetcher(data), - shutdown_event(false), part_check_thread(*this), + shutdown_event(false), + cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)")) { if (path_.empty()) @@ -1653,7 +1654,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry) /// We want to remove dropped parts from disk as soon as possible /// To be removed a partition should have zero refcount, therefore call the cleanup thread at exit parts_to_remove.clear(); - cleanup_thread->schedule(); + cleanup_thread.wakeup(); } @@ -2034,7 +2035,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) tryRemovePartsFromZooKeeperWithRetries(parts_to_remove); res_parts.clear(); parts_to_remove.clear(); - cleanup_thread->schedule(); + cleanup_thread.wakeup(); return true; } @@ -2668,7 +2669,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin { LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. - cleanup_thread->schedule(); + cleanup_thread.wakeup(); return false; } @@ -2791,11 +2792,7 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - if (restarting_thread) - { - restarting_thread->stop(); - restarting_thread.reset(); - } + restarting_thread.reset(); if (data_parts_exchange_endpoint_holder) { @@ -4541,7 +4538,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ /// Speedup removing of replaced parts from filesystem parts_to_remove.clear(); - cleanup_thread->schedule(); + cleanup_thread.wakeup(); /// If necessary, wait until the operation is performed on all replicas. if (context.getSettingsRef().replication_alter_partitions_sync > 1) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index e512977d4b0..c3f6a5c4925 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -283,25 +283,24 @@ private: /// A task that selects parts to merge. BackgroundSchedulePool::TaskHolder merge_selecting_task; + /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. + std::mutex merge_selecting_mutex; /// A task that marks finished mutations as done. BackgroundSchedulePool::TaskHolder mutations_finalizing_task; - /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. - std::mutex merge_selecting_mutex; - /// A thread that removes old parts, log entries, and blocks. - std::unique_ptr cleanup_thread; - - /// A thread that processes reconnection to ZooKeeper when the session expires. - std::unique_ptr restarting_thread; + ReplicatedMergeTreeCleanupThread cleanup_thread; /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. - std::unique_ptr alter_thread; + ReplicatedMergeTreeAlterThread alter_thread; /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. ReplicatedMergeTreePartCheckThread part_check_thread; + /// A thread that processes reconnection to ZooKeeper when the session expires. + std::unique_ptr restarting_thread; + /// An event that awakens `alter` method from waiting for the completion of the ALTER query. zkutil::EventPtr alter_query_event = std::make_shared();