From d89221c4587a76d7d7dfb105e1b8040efb0537fd Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 31 May 2018 16:05:05 +0300 Subject: [PATCH] easier init and deactivation of BackgroundSchedulePool tasks --- dbms/src/Common/BackgroundSchedulePool.cpp | 59 ++++++----------- dbms/src/Common/BackgroundSchedulePool.h | 66 ++++++++++++------- dbms/src/Common/ZooKeeper/LeaderElection.h | 25 +++---- .../ReplicatedMergeTreeAlterThread.cpp | 22 +++---- .../ReplicatedMergeTreeAlterThread.h | 4 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 2 +- .../ReplicatedMergeTreeCleanupThread.cpp | 16 ++--- .../ReplicatedMergeTreeCleanupThread.h | 7 +- .../ReplicatedMergeTreePartCheckThread.cpp | 24 +++---- .../ReplicatedMergeTreePartCheckThread.h | 3 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 16 ++--- .../MergeTree/ReplicatedMergeTreeQueue.h | 10 +-- .../ReplicatedMergeTreeRestartingThread.cpp | 36 +++++----- .../ReplicatedMergeTreeRestartingThread.h | 5 +- .../Storages/StorageReplicatedMergeTree.cpp | 52 ++++++++------- .../src/Storages/StorageReplicatedMergeTree.h | 13 ++-- 16 files changed, 175 insertions(+), 185 deletions(-) diff --git a/dbms/src/Common/BackgroundSchedulePool.cpp b/dbms/src/Common/BackgroundSchedulePool.cpp index 6e9f830ba22..ecf6c1a3993 100644 --- a/dbms/src/Common/BackgroundSchedulePool.cpp +++ b/dbms/src/Common/BackgroundSchedulePool.cpp @@ -17,29 +17,22 @@ namespace DB { -// TaskNotification - class TaskNotification final : public Poco::Notification { public: - explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {} + explicit TaskNotification(const BackgroundSchedulePool::TaskInfoPtr & task) : task(task) {} void execute() { task->execute(); } private: - BackgroundSchedulePool::TaskHandle task; + BackgroundSchedulePool::TaskInfoPtr task; }; -// BackgroundSchedulePool::TaskInfo - -BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function): - name(name), - pool(pool), - function(function) +BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_) + : pool(pool_) , log_name(log_name_) , function(function_) { } - bool BackgroundSchedulePool::TaskInfo::schedule() { std::lock_guard lock(schedule_mutex); @@ -49,7 +42,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule() scheduled = true; - if(!executing) + if (!executing) { if (delayed) pool.cancelDelayedTask(shared_from_this(), lock); @@ -60,7 +53,6 @@ bool BackgroundSchedulePool::TaskInfo::schedule() return true; } - bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) { std::lock_guard lock(schedule_mutex); @@ -72,7 +64,6 @@ bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) return true; } - void BackgroundSchedulePool::TaskInfo::deactivate() { std::lock_guard lock_exec(exec_mutex); @@ -88,16 +79,17 @@ void BackgroundSchedulePool::TaskInfo::deactivate() pool.cancelDelayedTask(shared_from_this(), lock_schedule); } - void BackgroundSchedulePool::TaskInfo::activate() { std::lock_guard lock(schedule_mutex); deactivated = false; } - void BackgroundSchedulePool::TaskInfo::execute() { + Stopwatch watch; + CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask}; + std::lock_guard lock_exec(exec_mutex); { @@ -110,17 +102,14 @@ void BackgroundSchedulePool::TaskInfo::execute() executing = true; } - CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask}; - - Stopwatch watch; function(); UInt64 milliseconds = watch.elapsedMilliseconds(); /// If the task is executed longer than specified time, it will be logged. - static const int32_t slow_execution_threshold_ms = 50; + static const int32_t slow_execution_threshold_ms = 200; if (milliseconds >= slow_execution_threshold_ms) - LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms."); + LOG_TRACE(&Logger::get(log_name), "Execution took " << milliseconds << " ms."); { std::lock_guard lock_schedule(schedule_mutex); @@ -132,7 +121,7 @@ void BackgroundSchedulePool::TaskInfo::execute() /// will have their chance to execute if(scheduled) - pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); + pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); } } @@ -145,8 +134,6 @@ zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback() } -// BackgroundSchedulePool - BackgroundSchedulePool::BackgroundSchedulePool(size_t size) : size(size) { @@ -165,7 +152,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool() try { { - std::unique_lock lock(delayed_tasks_lock); + std::unique_lock lock(delayed_tasks_mutex); shutdown = true; wakeup_cond.notify_all(); } @@ -184,24 +171,18 @@ BackgroundSchedulePool::~BackgroundSchedulePool() } -BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task) +BackgroundSchedulePool::TaskHolder BackgroundSchedulePool::createTask(const std::string & name, const TaskFunc & function) { - return std::make_shared(*this, name, task); + return TaskHolder(std::make_shared(*this, name, function)); } -void BackgroundSchedulePool::removeTask(const TaskHandle & task) -{ - task->deactivate(); -} - - -void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard & /* schedule_mutex_lock */) +void BackgroundSchedulePool::scheduleDelayedTask(const TaskInfoPtr & task, size_t ms, std::lock_guard & /* task_schedule_mutex_lock */) { Poco::Timestamp current_time; { - std::lock_guard lock(delayed_tasks_lock); + std::lock_guard lock(delayed_tasks_mutex); if (task->delayed) delayed_tasks.erase(task->iterator); @@ -214,10 +195,10 @@ void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t } -void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard & /* schedule_mutex_lock */) +void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lock_guard & /* task_schedule_mutex_lock */) { { - std::lock_guard lock(delayed_tasks_lock); + std::lock_guard lock(delayed_tasks_mutex); delayed_tasks.erase(task->iterator); task->delayed = false; } @@ -253,11 +234,11 @@ void BackgroundSchedulePool::delayExecutionThreadFunction() while (!shutdown) { - TaskHandle task; + TaskInfoPtr task; bool found = false; { - std::unique_lock lock(delayed_tasks_lock); + std::unique_lock lock(delayed_tasks_mutex); while(!shutdown) { diff --git a/dbms/src/Common/BackgroundSchedulePool.h b/dbms/src/Common/BackgroundSchedulePool.h index 64da78f9189..2fda381b111 100644 --- a/dbms/src/Common/BackgroundSchedulePool.h +++ b/dbms/src/Common/BackgroundSchedulePool.h @@ -33,16 +33,14 @@ class BackgroundSchedulePool { public: class TaskInfo; - using TaskHandle = std::shared_ptr; - using Tasks = std::multimap; - using Task = std::function; + using TaskInfoPtr = std::shared_ptr; + using TaskFunc = std::function; + using DelayedTasks = std::multimap; class TaskInfo : public std::enable_shared_from_this, private boost::noncopyable { public: - TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function); - - /// All these methods waits for current execution of task. + TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_); /// Schedule for execution as soon as possible (if not already scheduled). /// If the task was already scheduled with delay, the delay will be ignored. @@ -53,10 +51,10 @@ public: /// Further attempts to schedule become no-op. void deactivate(); + void activate(); - /// get zkutil::WatchCallback needed for zookeeper callbacks. - + /// get zkutil::WatchCallback needed for notifications from ZooKeeper watches. zkutil::WatchCallback getWatchCallback(); private: @@ -65,28 +63,52 @@ public: void execute(); - std::mutex schedule_mutex; - std::mutex exec_mutex; + BackgroundSchedulePool & pool; + std::string log_name; + TaskFunc function; + + std::mutex exec_mutex; + std::mutex schedule_mutex; - std::string name; bool deactivated = false; bool scheduled = false; bool delayed = false; bool executing = false; - BackgroundSchedulePool & pool; - Task function; /// If the task is scheduled with delay, points to element of delayed_tasks. - Tasks::iterator iterator; + DelayedTasks::iterator iterator; }; + class TaskHolder + { + public: + TaskHolder() = default; + explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {} + TaskHolder(const TaskHolder & other) = delete; + TaskHolder(TaskHolder && other) noexcept = default; + TaskHolder & operator=(const TaskHolder & other) noexcept = delete; + TaskHolder & operator=(TaskHolder && other) noexcept = default; + + ~TaskHolder() + { + if (task_info) + task_info->deactivate(); + } + + TaskInfo * operator->() { return task_info.get(); } + const TaskInfo * operator->() const { return task_info.get(); } + + private: + TaskInfoPtr task_info; + }; + + TaskHolder createTask(const std::string & log_name, const TaskFunc & function); + + size_t getNumberOfThreads() const { return size; } + BackgroundSchedulePool(size_t size); ~BackgroundSchedulePool(); - TaskHandle addTask(const std::string & name, const Task & task); - void removeTask(const TaskHandle & task); - size_t getNumberOfThreads() const { return size; } - private: using Threads = std::vector; @@ -94,10 +116,10 @@ private: void delayExecutionThreadFunction(); /// Schedule task for execution after specified delay from now. - void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard &); + void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard & task_schedule_mutex_lock); /// Remove task, that was scheduled with delay, from schedule. - void cancelDelayedTask(const TaskHandle & task, std::lock_guard &); + void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard & task_schedule_mutex_lock); /// Number for worker threads. const size_t size; @@ -108,11 +130,11 @@ private: /// Delayed notifications. std::condition_variable wakeup_cond; - std::mutex delayed_tasks_lock; + std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. std::thread delayed_thread; /// Tasks ordered by scheduled time. - Tasks delayed_tasks; + DelayedTasks delayed_tasks; }; using BackgroundSchedulePoolPtr = std::shared_ptr; diff --git a/dbms/src/Common/ZooKeeper/LeaderElection.h b/dbms/src/Common/ZooKeeper/LeaderElection.h index 891f0b0ef78..12adba37bff 100644 --- a/dbms/src/Common/ZooKeeper/LeaderElection.h +++ b/dbms/src/Common/ZooKeeper/LeaderElection.h @@ -39,8 +39,10 @@ public: */ LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "") : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) + , log_name("LeaderElection (" + path + ")") + , log(&Logger::get(log_name)) { - task_handle = pool.addTask("LeaderElection", [this] { threadFunction(); }); + task = pool.createTask(log_name, [this] { threadFunction(); }); createNode(); } @@ -50,22 +52,23 @@ public: return; shutdown_called = true; - task_handle->deactivate(); + task->deactivate(); } ~LeaderElection() { releaseNode(); - pool.removeTask(task_handle); } private: DB::BackgroundSchedulePool & pool; - DB::BackgroundSchedulePool::TaskHandle task_handle; + DB::BackgroundSchedulePool::TaskHolder task; std::string path; ZooKeeper & zookeeper; LeadershipHandler handler; std::string identifier; + std::string log_name; + Logger * log; EphemeralNodeHolderPtr node; std::string node_name; @@ -82,8 +85,8 @@ private: std::string node_path = node->getPath(); node_name = node_path.substr(node_path.find_last_of('/') + 1); - task_handle->activate(); - task_handle->schedule(); + task->activate(); + task->schedule(); } void releaseNode() @@ -111,25 +114,25 @@ private: return; } - if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task_handle->getWatchCallback())) - task_handle->schedule(); + if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback())) + task->schedule(); success = true; } catch (const KeeperException & e) { - DB::tryLogCurrentException("LeaderElection"); + DB::tryLogCurrentException(log); if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) return; } catch (...) { - DB::tryLogCurrentException("LeaderElection"); + DB::tryLogCurrentException(log); } if (!success) - task_handle->scheduleAfter(10 * 1000); + task->scheduleAfter(10 * 1000); } }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 95a2e3e9e94..72ff7084bc6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -14,17 +14,13 @@ namespace DB static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000; -ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) : - storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)")) - { - task_handle = storage_.context.getSchedulePool().addTask("ReplicatedMergeTreeAlterThread", [this]{run();}); - task_handle->schedule(); - } - -ReplicatedMergeTreeAlterThread::~ReplicatedMergeTreeAlterThread() +ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) + : storage(storage_) + , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeAlterThread)") + , log(&Logger::get(log_name)) { - storage.context.getSchedulePool().removeTask(task_handle); + task = storage_.context.getSchedulePool().createTask(log_name, [this]{ run(); }); + task->schedule(); } void ReplicatedMergeTreeAlterThread::run() @@ -59,7 +55,7 @@ void ReplicatedMergeTreeAlterThread::run() auto zookeeper = storage.getZooKeeper(); zkutil::Stat stat; - const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task_handle->getWatchCallback()); + const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task->getWatchCallback()); auto columns_in_zk = ColumnsDescription::parse(columns_str); bool changed_version = (stat.version != storage.columns_version); @@ -197,14 +193,14 @@ void ReplicatedMergeTreeAlterThread::run() return; force_recheck_parts = true; - task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS); + task->scheduleAfter(ALTER_ERROR_SLEEP_MS); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); force_recheck_parts = true; - task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS); + task->scheduleAfter(ALTER_ERROR_SLEEP_MS); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h index 37965670a4e..257ef0a7659 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h @@ -22,14 +22,14 @@ class ReplicatedMergeTreeAlterThread { public: ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_); - ~ReplicatedMergeTreeAlterThread(); private: void run(); StorageReplicatedMergeTree & storage; + String log_name; Logger * log; - BackgroundSchedulePool::TaskHandle task_handle; + BackgroundSchedulePool::TaskHolder task; }; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 3cf81eebe0c..07355529d7b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -304,7 +304,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo if (multi_code == ZooKeeperImpl::ZooKeeper::ZOK) { transaction.commit(); - storage.merge_selecting_task_handle->schedule(); + storage.merge_selecting_task->schedule(); /// Lock nodes have been already deleted, do not delete them in destructor block_number_lock->assumeUnlocked(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 2bd6f551027..05f6bd3ec0e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -16,16 +16,12 @@ namespace ErrorCodes ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) - : storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")) + : storage(storage_) + , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeCleanupThread)") + , log(&Logger::get(log_name)) { - task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeCleanupThread", [this]{ run(); }); - task_handle->schedule(); -} - -ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread() -{ - storage.context.getSchedulePool().removeTask(task_handle); + task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); + task->schedule(); } void ReplicatedMergeTreeCleanupThread::run() @@ -49,7 +45,7 @@ void ReplicatedMergeTreeCleanupThread::run() tryLogCurrentException(log, __PRETTY_FUNCTION__); } - task_handle->scheduleAfter(CLEANUP_SLEEP_MS); + task->scheduleAfter(CLEANUP_SLEEP_MS); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index e099b4ad91f..7d45a158c4c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -24,14 +24,13 @@ class ReplicatedMergeTreeCleanupThread public: ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); - ~ReplicatedMergeTreeCleanupThread(); - - void schedule() { task_handle->schedule(); } + void schedule() { task->schedule(); } private: StorageReplicatedMergeTree & storage; + String log_name; Logger * log; - BackgroundSchedulePool::TaskHandle task_handle; + BackgroundSchedulePool::TaskHolder task; pcg64 rng; void run(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 3f55af46dc9..99c837e8ef0 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -18,25 +18,25 @@ static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000; ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_) - : storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)")) + : storage(storage_) + , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreePartCheckThread)") + , log(&Logger::get(log_name)) { - task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreePartCheckThread", [this] { run(); }); - task_handle->schedule(); + task = storage.context.getSchedulePool().createTask(log_name, [this] { run(); }); + task->schedule(); } ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread() { stop(); - storage.context.getSchedulePool().removeTask(task_handle); } void ReplicatedMergeTreePartCheckThread::start() { std::lock_guard lock(start_stop_mutex); need_stop = false; - task_handle->activate(); - task_handle->schedule(); + task->activate(); + task->schedule(); } void ReplicatedMergeTreePartCheckThread::stop() @@ -46,7 +46,7 @@ void ReplicatedMergeTreePartCheckThread::stop() std::lock_guard lock(start_stop_mutex); need_stop = true; - task_handle->deactivate(); + task->deactivate(); } void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds) @@ -58,7 +58,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); parts_set.insert(name); - task_handle->schedule(); + task->schedule(); } @@ -340,7 +340,7 @@ void ReplicatedMergeTreePartCheckThread::run() } } - task_handle->schedule(); + task->schedule(); } catch (const zkutil::KeeperException & e) { @@ -349,12 +349,12 @@ void ReplicatedMergeTreePartCheckThread::run() if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) return; - task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); + task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); + task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h index a5b6932636c..5691e98b214 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h @@ -75,6 +75,7 @@ private: void searchForMissingPart(const String & part_name); StorageReplicatedMergeTree & storage; + String log_name; Logger * log; using StringSet = std::set; @@ -92,7 +93,7 @@ private: std::mutex start_stop_mutex; std::atomic need_stop { false }; - BackgroundSchedulePool::TaskHandle task_handle; + BackgroundSchedulePool::TaskHolder task; }; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 498d2e9a62e..d31cb3e3d07 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -288,23 +288,20 @@ bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & } -void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle) +void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback) { std::lock_guard lock(pull_logs_to_queue_mutex); String index_str = zookeeper->get(replica_path + "/log_pointer"); UInt64 index; - zkutil::WatchCallback watch_callback; - if (update_task_handle) - watch_callback = update_task_handle->getWatchCallback(); Strings log_entries = zookeeper->getChildrenWatch(zookeeper_path + "/log", nullptr, watch_callback); /// We update mutations after we have loaded the list of log entries, but before we insert them /// in the queue. /// With this we ensure that if you read the log state L1 and then the state of mutations M1, /// then L1 "happened-before" M1. - updateMutations(zookeeper, nullptr); + updateMutations(zookeeper); if (index_str.empty()) { @@ -434,13 +431,10 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B } } -void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle) +void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback) { std::lock_guard lock(update_mutations_mutex); - zkutil::WatchCallback watch_callback; - if (update_task_handle) - watch_callback = update_task_handle->getWatchCallback(); Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations", nullptr, watch_callback); StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end()); @@ -506,7 +500,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, B } } - storage.merge_selecting_task_handle->schedule(); + storage.merge_selecting_task->schedule(); } } @@ -1099,7 +1093,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( } } - queue_.pullLogsToQueue(zookeeper, nullptr); + queue_.pullLogsToQueue(zookeeper); /// Load current quorum status. zookeeper->tryGet(queue.zookeeper_path + "/quorum/last_part", last_quorum_part); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 3b876ff7b8b..5262789285e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -214,15 +214,15 @@ public: bool removeFromVirtualParts(const MergeTreePartInfo & part_info); /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. - * If update_task_handle != nullptr, will schedule this task when new entries appear in the log. + * If watch_callback is not empty, will call it when new entries appear in the log. * If there were new entries, notifies storage.queue_task_handle. * Additionally loads mutations (so that the set of mutations is always more recent than the queue). */ - void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle); + void pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback = {}); - /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task_handle. - /// If update_task_handle != nullptr, will schedule this task when new mutations appear in ZK. - void updateMutations(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle update_task_handle); + /// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task. + /// If watch_callback is not empty, will call it when new mutations appear in ZK. + void updateMutations(zkutil::ZooKeeperPtr zookeeper, zkutil::WatchCallback watch_callback = {}); /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). * And also wait for the completion of their execution, if they are now being executed. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 6c84645a092..88895a473ae 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -40,9 +40,10 @@ static String generateActiveNodeIdentifier() } ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_) - : storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")), - active_node_identifier(generateActiveNodeIdentifier()) + : storage(storage_) + , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeRestartingThread)") + , log(&Logger::get(log_name)) + , active_node_identifier(generateActiveNodeIdentifier()) { check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000; @@ -50,20 +51,13 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) check_period_ms = storage.data.settings.check_delay_period * 1000; - storage.queue_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::queueUpdatingThread", [this]{ storage.queueUpdatingThread(); }); - storage.queue_updating_task_handle->deactivate(); - - storage.mutations_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::mutationsUpdatingThread", [this]{ storage.mutationsUpdatingThread(); }); - - task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeRestartingThread", [this]{ run(); }); - task_handle->schedule(); + task = storage.context.getSchedulePool().createTask(log_name, [this]{ run(); }); + task->schedule(); } ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() { - storage.context.getSchedulePool().removeTask(task_handle); completeShutdown(); - storage.context.getSchedulePool().removeTask(storage.queue_updating_task_handle); } void ReplicatedMergeTreeRestartingThread::run() @@ -105,7 +99,7 @@ void ReplicatedMergeTreeRestartingThread::run() if (first_time) storage.startup_event.set(); - task_handle->scheduleAfter(retry_period_ms); + task->scheduleAfter(retry_period_ms); return; } @@ -113,7 +107,7 @@ void ReplicatedMergeTreeRestartingThread::run() { if (first_time) storage.startup_event.set(); - task_handle->scheduleAfter(retry_period_ms); + task->scheduleAfter(retry_period_ms); return; } @@ -170,7 +164,7 @@ void ReplicatedMergeTreeRestartingThread::run() tryLogCurrentException(log, __PRETTY_FUNCTION__); } - task_handle->scheduleAfter(check_period_ms); + task->scheduleAfter(check_period_ms); } void ReplicatedMergeTreeRestartingThread::completeShutdown() @@ -214,10 +208,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.shutdown_called = false; storage.shutdown_event.reset(); - storage.queue_updating_task_handle->activate(); - storage.queue_updating_task_handle->schedule(); - storage.mutations_updating_task_handle->activate(); - storage.mutations_updating_task_handle->schedule(); + storage.queue_updating_task->activate(); + storage.queue_updating_task->schedule(); + storage.mutations_updating_task->activate(); + storage.mutations_updating_task->schedule(); storage.part_check_thread.start(); storage.alter_thread = std::make_unique(storage); storage.cleanup_thread = std::make_unique(storage); @@ -365,8 +359,8 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.exitLeaderElection(); - storage.queue_updating_task_handle->deactivate(); - storage.mutations_updating_task_handle->deactivate(); + storage.queue_updating_task->deactivate(); + storage.mutations_updating_task->deactivate(); storage.cleanup_thread.reset(); storage.alter_thread.reset(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 2b53d25a884..7747f008ddf 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -28,7 +28,7 @@ public: void wakeup() { wakeup_event.set(); - task_handle->schedule(); + task->schedule(); } Poco::Event & getWakeupEvent() @@ -44,6 +44,7 @@ public: private: StorageReplicatedMergeTree & storage; + String log_name; Logger * log; Poco::Event wakeup_event; std::atomic need_stop {false}; @@ -51,7 +52,7 @@ private: /// The random data we wrote into `/replicas/me/is_active`. String active_node_identifier; - BackgroundSchedulePool::TaskHandle task_handle; + BackgroundSchedulePool::TaskHolder task; Int64 check_period_ms; /// The frequency of checking expiration of session in ZK. bool first_time = true; /// Activate replica for the first time. time_t prev_time_of_check_delay = 0; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6e4f6a9c8d4..bb10119e5b0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -227,11 +227,17 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( zookeeper_path = "/" + zookeeper_path; replica_path = zookeeper_path + "/replicas/" + replica_name; + queue_updating_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); }); + + mutations_updating_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsUpdatingTask)", [this]{ mutationsUpdatingTask(); }); + + merge_selecting_task = context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); }); + /// Will be activated if we win leader election. + merge_selecting_task->deactivate(); + if (context.hasZooKeeper()) current_zookeeper = context.getZooKeeper(); - merge_selecting_task_handle = context_.getSchedulePool().addTask("StorageReplicatedMergeTree::mergeSelectingThread", [this] { mergeSelectingThread(); }); - bool skip_sanity_checks = false; if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data")) @@ -1298,7 +1304,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. * This is not a problem, because in this case the merge will remain in the queue, and we will try again. */ - merge_selecting_task_handle->schedule(); + merge_selecting_task->schedule(); ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); write_part_log({}); @@ -1406,7 +1412,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. * This is not a problem, because in this case the entry will remain in the queue, and we will try again. */ - merge_selecting_task_handle->schedule(); + merge_selecting_task->schedule(); ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations); write_part_log({}); @@ -2026,7 +2032,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const StorageReplicatedMerg } -void StorageReplicatedMergeTree::queueUpdatingThread() +void StorageReplicatedMergeTree::queueUpdatingTask() { //most probably this check is not relevant if (shutdown_called) @@ -2039,7 +2045,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread() } try { - queue.pullLogsToQueue(getZooKeeper(), queue_updating_task_handle); + queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback()); last_queue_update_finish_time.store(time(nullptr)); queue_update_in_progress = false; } @@ -2050,21 +2056,21 @@ void StorageReplicatedMergeTree::queueUpdatingThread() if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) return; - queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); } } -void StorageReplicatedMergeTree::mutationsUpdatingThread() +void StorageReplicatedMergeTree::mutationsUpdatingTask() { try { - queue.updateMutations(getZooKeeper(), mutations_updating_task_handle); + queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback()); } catch (const zkutil::KeeperException & e) { @@ -2073,12 +2079,12 @@ void StorageReplicatedMergeTree::mutationsUpdatingThread() if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) return; - mutations_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); } catch (...) { tryLogCurrentException(log, __PRETTY_FUNCTION__); - mutations_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); } } @@ -2158,7 +2164,7 @@ bool StorageReplicatedMergeTree::queueTask() } -void StorageReplicatedMergeTree::mergeSelectingThread() +void StorageReplicatedMergeTree::mergeSelectingTask() { if (!is_leader) return; @@ -2232,9 +2238,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread() return; if (!success) - merge_selecting_task_handle->scheduleAfter(MERGE_SELECTING_SLEEP_MS); + merge_selecting_task->scheduleAfter(MERGE_SELECTING_SLEEP_MS); else - merge_selecting_task_handle->schedule(); + merge_selecting_task->schedule(); } @@ -2378,8 +2384,8 @@ void StorageReplicatedMergeTree::enterLeaderElection() LOG_INFO(log, "Became leader"); is_leader = true; - merge_selecting_task_handle->activate(); - merge_selecting_task_handle->schedule(); + merge_selecting_task->activate(); + merge_selecting_task->schedule(); }; try @@ -2414,7 +2420,7 @@ void StorageReplicatedMergeTree::exitLeaderElection() LOG_INFO(log, "Stopped being leader"); is_leader = false; - merge_selecting_task_handle->deactivate(); + merge_selecting_task->deactivate(); } /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one @@ -2697,7 +2703,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin if (quorum) updateQuorum(part_name); - merge_selecting_task_handle->schedule(); + merge_selecting_task->schedule(); for (const auto & replaced_part : replaced_parts) { @@ -2737,7 +2743,7 @@ void StorageReplicatedMergeTree::startup() database_name + "." + table_name + " (ReplicatedMergeTreeQueue)", data.getDataParts(), current_zookeeper); - queue.pullLogsToQueue(current_zookeeper, nullptr); + queue.pullLogsToQueue(current_zookeeper); last_queue_update_finish_time.store(time(nullptr)); /// NOTE: not updating last_queue_update_start_time because it must contain the time when /// the notification of queue change was received. In the beginning it is effectively infinite. @@ -2783,8 +2789,6 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree() { tryLogCurrentException(__PRETTY_FUNCTION__); } - - context.getSchedulePool().removeTask(merge_selecting_task_handle); } @@ -3759,7 +3763,7 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t * The conclusion that the replica does not lag may be incorrect, * because the information about `min_unprocessed_insert_time` is taken * only from that part of the log that has been moved to the queue. - * If the replica for some reason has stalled `queueUpdatingThread`, + * If the replica for some reason has stalled `queueUpdatingTask`, * then `min_unprocessed_insert_time` will be incorrect. */ @@ -4503,7 +4507,7 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds) { /// Let's fetch new log entries firstly - queue.pullLogsToQueue(getZooKeeper(), nullptr); + queue.pullLogsToQueue(getZooKeeper()); Stopwatch watch; Poco::Event event; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index ce7102fcdd2..55853dfcaff 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -198,7 +198,6 @@ private: friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; friend class ReplicatedMergeTreeQueue; - friend class ReplicatedMergeTreeMergeSelectingThread; friend class MergeTreeData; using LogEntry = ReplicatedMergeTreeLogEntry; @@ -269,15 +268,15 @@ private: /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. bool queue_update_in_progress = false; - BackgroundSchedulePool::TaskHandle queue_updating_task_handle; + BackgroundSchedulePool::TaskHolder queue_updating_task; - BackgroundSchedulePool::TaskHandle mutations_updating_task_handle; + BackgroundSchedulePool::TaskHolder mutations_updating_task; /// A task that performs actions from the queue. BackgroundProcessingPool::TaskHandle queue_task_handle; /// A task that selects parts to merge. - BackgroundSchedulePool::TaskHandle merge_selecting_task_handle; + BackgroundSchedulePool::TaskHolder merge_selecting_task; /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. std::mutex merge_selecting_mutex; @@ -385,9 +384,9 @@ private: /** Updates the queue. */ - void queueUpdatingThread(); + void queueUpdatingTask(); - void mutationsUpdatingThread(); + void mutationsUpdatingTask(); /** Performs actions from the queue. */ @@ -405,7 +404,7 @@ private: /** Selects the parts to merge and writes to the log. */ - void mergeSelectingThread(); + void mergeSelectingTask(); /** Write the selected parts to merge into the log, * Call when merge_selecting_mutex is locked.