diff --git a/dbms/src/Common/BackgroundSchedulePool.cpp b/dbms/src/Common/BackgroundSchedulePool.cpp new file mode 100644 index 00000000000..6e9f830ba22 --- /dev/null +++ b/dbms/src/Common/BackgroundSchedulePool.cpp @@ -0,0 +1,300 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric BackgroundSchedulePoolTask; + extern const Metric MemoryTrackingInBackgroundSchedulePool; +} + +namespace DB +{ + + +// TaskNotification + +class TaskNotification final : public Poco::Notification +{ +public: + explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {} + void execute() { task->execute(); } + +private: + BackgroundSchedulePool::TaskHandle task; +}; + + +// BackgroundSchedulePool::TaskInfo + +BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function): + name(name), + pool(pool), + function(function) +{ +} + + +bool BackgroundSchedulePool::TaskInfo::schedule() +{ + std::lock_guard lock(schedule_mutex); + + if (deactivated || scheduled) + return false; + + scheduled = true; + + if(!executing) + { + if (delayed) + pool.cancelDelayedTask(shared_from_this(), lock); + + pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); + } + + return true; +} + + +bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) +{ + std::lock_guard lock(schedule_mutex); + + if (deactivated || scheduled) + return false; + + pool.scheduleDelayedTask(shared_from_this(), ms, lock); + return true; +} + + +void BackgroundSchedulePool::TaskInfo::deactivate() +{ + std::lock_guard lock_exec(exec_mutex); + std::lock_guard lock_schedule(schedule_mutex); + + if (deactivated) + return; + + deactivated = true; + scheduled = false; + + if (delayed) + pool.cancelDelayedTask(shared_from_this(), lock_schedule); +} + + +void BackgroundSchedulePool::TaskInfo::activate() +{ + std::lock_guard lock(schedule_mutex); + deactivated = false; +} + + +void BackgroundSchedulePool::TaskInfo::execute() +{ + std::lock_guard lock_exec(exec_mutex); + + { + std::lock_guard lock_schedule(schedule_mutex); + + if (deactivated) + return; + + scheduled = false; + executing = true; + } + + CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask}; + + Stopwatch watch; + function(); + UInt64 milliseconds = watch.elapsedMilliseconds(); + + /// If the task is executed longer than specified time, it will be logged. + static const int32_t slow_execution_threshold_ms = 50; + + if (milliseconds >= slow_execution_threshold_ms) + LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms."); + + { + std::lock_guard lock_schedule(schedule_mutex); + + executing = false; + + /// In case was scheduled while executing (including a scheduleAfter which expired) we schedule the task + /// on the queue. We don't call the function again here because this way all tasks + /// will have their chance to execute + + if(scheduled) + pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); + } + +} + +zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback() +{ + return [t=shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &) { + t->schedule(); + }; +} + + +// BackgroundSchedulePool + +BackgroundSchedulePool::BackgroundSchedulePool(size_t size) + : size(size) +{ + LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads"); + + threads.resize(size); + for (auto & thread : threads) + thread = std::thread([this] { threadFunction(); }); + + delayed_thread = std::thread([this] { delayExecutionThreadFunction(); }); +} + + +BackgroundSchedulePool::~BackgroundSchedulePool() +{ + try + { + { + std::unique_lock lock(delayed_tasks_lock); + shutdown = true; + wakeup_cond.notify_all(); + } + + queue.wakeUpAll(); + delayed_thread.join(); + + LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish."); + for (std::thread & thread : threads) + thread.join(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task) +{ + return std::make_shared(*this, name, task); +} + + +void BackgroundSchedulePool::removeTask(const TaskHandle & task) +{ + task->deactivate(); +} + + +void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard & /* schedule_mutex_lock */) +{ + Poco::Timestamp current_time; + + { + std::lock_guard lock(delayed_tasks_lock); + + if (task->delayed) + delayed_tasks.erase(task->iterator); + + task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task); + task->delayed = true; + } + + wakeup_cond.notify_all(); +} + + +void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard & /* schedule_mutex_lock */) +{ + { + std::lock_guard lock(delayed_tasks_lock); + delayed_tasks.erase(task->iterator); + task->delayed = false; + } + + wakeup_cond.notify_all(); +} + + +void BackgroundSchedulePool::threadFunction() +{ + setThreadName("BackgrSchedPool"); + + MemoryTracker memory_tracker; + memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool); + current_memory_tracker = &memory_tracker; + + while (!shutdown) + { + if (Poco::AutoPtr notification = queue.waitDequeueNotification()) + { + TaskNotification & task_notification = static_cast(*notification); + task_notification.execute(); + } + } + + current_memory_tracker = nullptr; +} + + +void BackgroundSchedulePool::delayExecutionThreadFunction() +{ + setThreadName("BckSchPoolDelay"); + + while (!shutdown) + { + TaskHandle task; + bool found = false; + + { + std::unique_lock lock(delayed_tasks_lock); + + while(!shutdown) + { + Poco::Timestamp min_time; + + if (!delayed_tasks.empty()) + { + auto t = delayed_tasks.begin(); + min_time = t->first; + task = t->second; + } + + if (!task) + { + wakeup_cond.wait(lock); + continue; + } + + Poco::Timestamp current_time; + + if (min_time > current_time) + { + wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time)); + continue; + } + else + { + /// We have a task ready for execution + found = true; + break; + } + } + } + + if(found) + task->schedule(); + } +} + +} diff --git a/dbms/src/Common/BackgroundSchedulePool.h b/dbms/src/Common/BackgroundSchedulePool.h new file mode 100644 index 00000000000..64da78f9189 --- /dev/null +++ b/dbms/src/Common/BackgroundSchedulePool.h @@ -0,0 +1,120 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class TaskNotification; + + +/** Executes functions scheduled at a specific point in time. + * Basically all tasks are added in a queue and precessed by worker threads. + * + * The most important difference between this and BackgroundProcessingPool + * is that we have the guarantee that the same function is not executed from many workers in the same time. + * + * The usage scenario: instead starting a separate thread for each task, + * register a task in BackgroundSchedulePool and when you need to run the task, + * call schedule or scheduleAfter(duration) method. + */ +class BackgroundSchedulePool +{ +public: + class TaskInfo; + using TaskHandle = std::shared_ptr; + using Tasks = std::multimap; + using Task = std::function; + + class TaskInfo : public std::enable_shared_from_this, private boost::noncopyable + { + public: + TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function); + + /// All these methods waits for current execution of task. + + /// Schedule for execution as soon as possible (if not already scheduled). + /// If the task was already scheduled with delay, the delay will be ignored. + bool schedule(); + + /// Schedule for execution after specified delay. + bool scheduleAfter(size_t ms); + + /// Further attempts to schedule become no-op. + void deactivate(); + void activate(); + + /// get zkutil::WatchCallback needed for zookeeper callbacks. + + zkutil::WatchCallback getWatchCallback(); + + private: + friend class TaskNotification; + friend class BackgroundSchedulePool; + + void execute(); + + std::mutex schedule_mutex; + std::mutex exec_mutex; + + std::string name; + bool deactivated = false; + bool scheduled = false; + bool delayed = false; + bool executing = false; + BackgroundSchedulePool & pool; + Task function; + + /// If the task is scheduled with delay, points to element of delayed_tasks. + Tasks::iterator iterator; + }; + + BackgroundSchedulePool(size_t size); + ~BackgroundSchedulePool(); + + TaskHandle addTask(const std::string & name, const Task & task); + void removeTask(const TaskHandle & task); + size_t getNumberOfThreads() const { return size; } + +private: + using Threads = std::vector; + + void threadFunction(); + void delayExecutionThreadFunction(); + + /// Schedule task for execution after specified delay from now. + void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard &); + + /// Remove task, that was scheduled with delay, from schedule. + void cancelDelayedTask(const TaskHandle & task, std::lock_guard &); + + /// Number for worker threads. + const size_t size; + std::atomic shutdown {false}; + Threads threads; + Poco::NotificationQueue queue; + + /// Delayed notifications. + + std::condition_variable wakeup_cond; + std::mutex delayed_tasks_lock; + /// Thread waiting for next delayed task. + std::thread delayed_thread; + /// Tasks ordered by scheduled time. + Tasks delayed_tasks; +}; + +using BackgroundSchedulePoolPtr = std::shared_ptr; + +} diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index af29cb4912f..e955c5dc3a4 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -9,6 +9,7 @@ M(ReplicatedSend) \ M(ReplicatedChecks) \ M(BackgroundPoolTask) \ + M(BackgroundSchedulePoolTask) \ M(DiskSpaceReservedForMerge) \ M(DistributedSend) \ M(QueryPreempted) \ @@ -25,6 +26,7 @@ M(LeaderReplica) \ M(MemoryTracking) \ M(MemoryTrackingInBackgroundProcessingPool) \ + M(MemoryTrackingInBackgroundSchedulePool) \ M(MemoryTrackingForMerges) \ M(LeaderElection) \ M(EphemeralNode) \ diff --git a/dbms/src/Common/ZooKeeper/LeaderElection.h b/dbms/src/Common/ZooKeeper/LeaderElection.h index e730765e1f1..891f0b0ef78 100644 --- a/dbms/src/Common/ZooKeeper/LeaderElection.h +++ b/dbms/src/Common/ZooKeeper/LeaderElection.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace ProfileEvents @@ -36,9 +37,10 @@ public: * It means that different participants of leader election have different identifiers * and existence of more than one ephemeral node with same identifier indicates an error. */ - LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "") - : path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) + LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "") + : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) { + task_handle = pool.addTask("LeaderElection", [this] { threadFunction(); }); createNode(); } @@ -48,17 +50,18 @@ public: return; shutdown_called = true; - event->set(); - if (thread.joinable()) - thread.join(); + task_handle->deactivate(); } ~LeaderElection() { releaseNode(); + pool.removeTask(task_handle); } private: + DB::BackgroundSchedulePool & pool; + DB::BackgroundSchedulePool::TaskHandle task_handle; std::string path; ZooKeeper & zookeeper; LeadershipHandler handler; @@ -67,9 +70,7 @@ private: EphemeralNodeHolderPtr node; std::string node_name; - std::thread thread; std::atomic shutdown_called {false}; - EventPtr event = std::make_shared(); CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; @@ -81,7 +82,8 @@ private: std::string node_path = node->getPath(); node_name = node_path.substr(node_path.find_last_of('/') + 1); - thread = std::thread(&LeaderElection::threadFunction, this); + task_handle->activate(); + task_handle->schedule(); } void releaseNode() @@ -92,45 +94,42 @@ private: void threadFunction() { - while (!shutdown_called) + bool success = false; + + try { - bool success = false; + Strings children = zookeeper.getChildren(path); + std::sort(children.begin(), children.end()); + auto it = std::lower_bound(children.begin(), children.end(), node_name); + if (it == children.end() || *it != node_name) + throw Poco::Exception("Assertion failed in LeaderElection"); - try + if (it == children.begin()) { - Strings children = zookeeper.getChildren(path); - std::sort(children.begin(), children.end()); - auto it = std::lower_bound(children.begin(), children.end(), node_name); - if (it == children.end() || *it != node_name) - throw Poco::Exception("Assertion failed in LeaderElection"); - - if (it == children.begin()) - { - ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); - handler(); - return; - } - - if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event)) - event->wait(); - - success = true; - } - catch (const KeeperException & e) - { - DB::tryLogCurrentException("LeaderElection"); - - if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) - break; - } - catch (...) - { - DB::tryLogCurrentException("LeaderElection"); + ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); + handler(); + return; } - if (!success) - event->tryWait(10 * 1000); + if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task_handle->getWatchCallback())) + task_handle->schedule(); + + success = true; } + catch (const KeeperException & e) + { + DB::tryLogCurrentException("LeaderElection"); + + if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) + return; + } + catch (...) + { + DB::tryLogCurrentException("LeaderElection"); + } + + if (!success) + task_handle->scheduleAfter(10 * 1000); } }; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index dbcc51ccc92..20cbd3f37ba 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -367,6 +367,16 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); } +std::string ZooKeeper::getWatch(const std::string & path, Stat * stat, WatchCallback watch_callback) +{ + int32_t code = 0; + std::string res; + if (tryGetWatch(path, res, stat, watch_callback, &code)) + return res; + else + throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); +} + bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, const EventPtr & watch, int * return_code) { return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index bbcf82fb2c2..05813770b0c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -113,6 +113,7 @@ public: bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback); std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); + std::string getWatch(const std::string & path, Stat * stat, WatchCallback watch_callback); /// Doesn't not throw in the following cases: /// * The node doesn't exist. Returns false in this case. diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index fffa43b5dcc..d93988226e7 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -131,6 +132,7 @@ struct ContextShared ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables. + BackgroundSchedulePoolPtr schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr compiler; /// Used for dynamic compilation of queries' parts if it necessary. std::shared_ptr ddl_worker; /// Process ddl commands from zk. @@ -1328,6 +1330,14 @@ BackgroundProcessingPool & Context::getBackgroundPool() return *shared->background_pool; } +BackgroundSchedulePool & Context::getSchedulePool() +{ + auto lock = getLock(); + if (!shared->schedule_pool) + shared->schedule_pool = std::make_shared(settings.background_schedule_pool_size); + return *shared->schedule_pool; +} + void Context::setDDLWorker(std::shared_ptr ddl_worker) { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 4c3d4fdbf9c..141260b37d2 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -40,6 +40,7 @@ class ExternalDictionaries; class ExternalModels; class InterserverIOHandler; class BackgroundProcessingPool; +class BackgroundSchedulePool; class MergeList; class Cluster; class Compiler; @@ -328,6 +329,7 @@ public: void dropCaches() const; BackgroundProcessingPool & getBackgroundPool(); + BackgroundSchedulePool & getSchedulePool(); void setDDLWorker(std::shared_ptr ddl_worker); DDLWorker & getDDLWorker() const; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index ff275938ad2..ae6ee6f47a0 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -49,6 +49,7 @@ struct Settings M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.") \ + M(SettingUInt64, background_schedule_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.") \ \ M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS, "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \ \ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 3f88b9d38f9..0d77f4add1a 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -14,200 +14,198 @@ namespace DB static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000; -ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) - : storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)")), - thread([this] { run(); }) {} +ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) : + storage(storage_), + log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)")) + { + task_handle = storage_.context.getSchedulePool().addTask("ReplicatedMergeTreeAlterThread", [this]{run();}); + task_handle->schedule(); + } +ReplicatedMergeTreeAlterThread::~ReplicatedMergeTreeAlterThread() +{ + storage.context.getSchedulePool().removeTask(task_handle); +} void ReplicatedMergeTreeAlterThread::run() { - setThreadName("ReplMTAlter"); - bool force_recheck_parts = true; - while (!need_stop) + try { - try + /** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns), + * as well as a description of columns in local file with metadata (storage.data.getColumnsList()). + * + * If these descriptions are different - you need to do ALTER. + * + * If stored version of the node (columns_version) differs from the version in ZK, + * then the description of the columns in ZK does not necessarily differ from the local + * - this can happen with a loop from ALTER-s, which as a whole, does not change anything. + * In this case, you need to update the stored version number, + * and also check the structure of parts, and, if necessary, make ALTER. + * + * Recorded version number needs to be updated after updating the metadata, under lock. + * This version number is checked against the current one for INSERT. + * That is, we make sure to insert blocks with the correct structure. + * + * When the server starts, previous ALTER might not have been completed. + * Therefore, for the first time, regardless of the changes, we check the structure of all parts, + * (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns) + * and do ALTER if necessary. + * + * TODO: Too complicated, rewrite everything. + */ + + auto zookeeper = storage.getZooKeeper(); + + zkutil::Stat stat; + const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task_handle->getWatchCallback()); + auto columns_in_zk = ColumnsDescription::parse(columns_str); + + bool changed_version = (stat.version != storage.columns_version); + { - /** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns), - * as well as a description of columns in local file with metadata (storage.data.getColumnsList()). - * - * If these descriptions are different - you need to do ALTER. - * - * If stored version of the node (columns_version) differs from the version in ZK, - * then the description of the columns in ZK does not necessarily differ from the local - * - this can happen with a loop from ALTER-s, which as a whole, does not change anything. - * In this case, you need to update the stored version number, - * and also check the structure of parts, and, if necessary, make ALTER. - * - * Recorded version number needs to be updated after updating the metadata, under lock. - * This version number is checked against the current one for INSERT. - * That is, we make sure to insert blocks with the correct structure. - * - * When the server starts, previous ALTER might not have been completed. - * Therefore, for the first time, regardless of the changes, we check the structure of all parts, - * (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns) - * and do ALTER if necessary. - * - * TODO: Too complicated, rewrite everything. - */ + /// If you need to lock table structure, then suspend merges. + ActionBlocker::LockHolder merge_blocker; - auto zookeeper = storage.getZooKeeper(); + if (changed_version || force_recheck_parts) + merge_blocker = storage.merger.merges_blocker.cancel(); - zkutil::Stat stat; - const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, wakeup_event); - auto columns_in_zk = ColumnsDescription::parse(columns_str); - - bool changed_version = (stat.version != storage.columns_version); + MergeTreeData::DataParts parts; + /// If columns description has changed, we will update table structure locally. + if (changed_version) { - /// If you need to lock table structure, then suspend merges. - ActionBlocker::LockHolder merge_blocker; + /// Temporarily cancel part checks to avoid locking for long time. + auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop(); - if (changed_version || force_recheck_parts) - merge_blocker = storage.merger.merges_blocker.cancel(); + /// Temporarily cancel parts sending + ActionBlocker::LockHolder data_parts_exchange_blocker; + if (storage.data_parts_exchange_endpoint_holder) + data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel(); - MergeTreeData::DataParts parts; + /// Temporarily cancel part fetches + auto fetches_blocker = storage.fetcher.blocker.cancel(); - /// If columns description has changed, we will update table structure locally. - if (changed_version) + LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); + + auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__); + + if (columns_in_zk != storage.getColumns()) { - /// Temporarily cancel part checks to avoid locking for long time. - auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop(); + LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally."); - /// Temporarily cancel parts sending - ActionBlocker::LockHolder data_parts_exchange_blocker; - if (storage.data_parts_exchange_endpoint_holder) - data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel(); + storage.context.getDatabase(storage.database_name)->alterTable( + storage.context, storage.table_name, columns_in_zk, {}); + storage.setColumns(std::move(columns_in_zk)); - /// Temporarily cancel part fetches - auto fetches_blocker = storage.fetcher.blocker.cancel(); + /// Reinitialize primary key because primary key column types might have changed. + storage.data.initPrimaryKey(); - LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); - - auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__); - - if (columns_in_zk != storage.getColumns()) - { - LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally."); - - storage.context.getDatabase(storage.database_name)->alterTable( - storage.context, storage.table_name, columns_in_zk, {}); - storage.setColumns(std::move(columns_in_zk)); - - /// Reinitialize primary key because primary key column types might have changed. - storage.data.initPrimaryKey(); - - LOG_INFO(log, "Applied changes to table."); - } - else - { - LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs."); - } - - /// You need to get a list of parts under table lock to avoid race condition with merge. - parts = storage.data.getDataParts(); - - storage.columns_version = stat.version; + LOG_INFO(log, "Applied changes to table."); + } + else + { + LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs."); } - /// Update parts. - if (changed_version || force_recheck_parts) - { - auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__); + /// You need to get a list of parts under table lock to avoid race condition with merge. + parts = storage.data.getDataParts(); - if (changed_version) - LOG_INFO(log, "ALTER-ing parts"); - - int changed_parts = 0; - - if (!changed_version) - parts = storage.data.getDataParts(); - - const auto columns_for_parts = storage.getColumns().getAllPhysical(); - - for (const MergeTreeData::DataPartPtr & part : parts) - { - /// Update the part and write result to temporary files. - /// TODO: You can skip checking for too large changes if ZooKeeper has, for example, - /// node /flags/force_alter. - auto transaction = storage.data.alterDataPart( - part, columns_for_parts, storage.data.primary_expr_ast, false); - - if (!transaction) - continue; - - ++changed_parts; - - /// Update part metadata in ZooKeeper. - zkutil::Requests ops; - ops.emplace_back(zkutil::makeSetRequest( - storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1)); - ops.emplace_back(zkutil::makeSetRequest( - storage.replica_path + "/parts/" + part->name + "/checksums", - storage.getChecksumsForZooKeeper(transaction->getNewChecksums()), - -1)); - - try - { - zookeeper->multi(ops); - } - catch (const zkutil::KeeperException & e) - { - /// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally. - if (e.code == ZooKeeperImpl::ZooKeeper::ZNONODE) - storage.enqueuePartForCheck(part->name); - - throw; - } - - /// Apply file changes. - transaction->commit(); - } - - /// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN - storage.data.recalculateColumnSizes(); - - /// List of columns for a specific replica. - zookeeper->set(storage.replica_path + "/columns", columns_str); - - if (changed_version) - { - if (changed_parts != 0) - LOG_INFO(log, "ALTER-ed " << changed_parts << " parts"); - else - LOG_INFO(log, "No parts ALTER-ed"); - } - - force_recheck_parts = false; - } - - /// It's important that parts and merge_blocker are destroyed before the wait. + storage.columns_version = stat.version; } - wakeup_event->wait(); - } - catch (const zkutil::KeeperException & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); + /// Update parts. + if (changed_version || force_recheck_parts) + { + auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__); - if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) - break; + if (changed_version) + LOG_INFO(log, "ALTER-ing parts"); - force_recheck_parts = true; - wakeup_event->tryWait(ALTER_ERROR_SLEEP_MS); - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); + int changed_parts = 0; - force_recheck_parts = true; - wakeup_event->tryWait(ALTER_ERROR_SLEEP_MS); + if (!changed_version) + parts = storage.data.getDataParts(); + + const auto columns_for_parts = storage.getColumns().getAllPhysical(); + + for (const MergeTreeData::DataPartPtr & part : parts) + { + /// Update the part and write result to temporary files. + /// TODO: You can skip checking for too large changes if ZooKeeper has, for example, + /// node /flags/force_alter. + auto transaction = storage.data.alterDataPart( + part, columns_for_parts, storage.data.primary_expr_ast, false); + + if (!transaction) + continue; + + ++changed_parts; + + /// Update part metadata in ZooKeeper. + zkutil::Requests ops; + ops.emplace_back(zkutil::makeSetRequest( + storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1)); + ops.emplace_back(zkutil::makeSetRequest( + storage.replica_path + "/parts/" + part->name + "/checksums", + storage.getChecksumsForZooKeeper(transaction->getNewChecksums()), + -1)); + + try + { + zookeeper->multi(ops); + } + catch (const zkutil::KeeperException & e) + { + /// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally. + if (e.code == ZooKeeperImpl::ZooKeeper::ZNONODE) + storage.enqueuePartForCheck(part->name); + + throw; + } + + /// Apply file changes. + transaction->commit(); + } + + /// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN + storage.data.recalculateColumnSizes(); + + /// List of columns for a specific replica. + zookeeper->set(storage.replica_path + "/columns", columns_str); + + if (changed_version) + { + if (changed_parts != 0) + LOG_INFO(log, "ALTER-ed " << changed_parts << " parts"); + else + LOG_INFO(log, "No parts ALTER-ed"); + } + + force_recheck_parts = false; + } + + /// It's important that parts and merge_blocker are destroyed before the wait. } } + catch (const zkutil::KeeperException & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); - LOG_DEBUG(log, "Alter thread finished"); + if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) + return; + + force_recheck_parts = true; + task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + + force_recheck_parts = true; + task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS); + } } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h index af177cdd101..37965670a4e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -21,25 +22,14 @@ class ReplicatedMergeTreeAlterThread { public: ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_); - - ~ReplicatedMergeTreeAlterThread() - { - need_stop = true; - wakeup_event->set(); - if (thread.joinable()) - thread.join(); - } + ~ReplicatedMergeTreeAlterThread(); private: void run(); StorageReplicatedMergeTree & storage; Logger * log; - - zkutil::EventPtr wakeup_event { std::make_shared() }; - std::atomic need_stop { false }; - - std::thread thread; + BackgroundSchedulePool::TaskHandle task_handle; }; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 8aca9fe4f2e..cf45195ef74 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -362,7 +362,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo if (multi_code == ZooKeeperImpl::ZooKeeper::ZOK) { transaction.commit(); - storage.merge_selecting_event.set(); + storage.merge_selecting_task_handle->schedule(); /// Lock nodes have been already deleted, do not delete them in destructor block_number_lock.assumeUnlocked(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 6b4fdbad390..2bd6f551027 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -17,41 +17,40 @@ namespace ErrorCodes ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) : storage(storage_), - log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")), - thread([this] { run(); }) + log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")) { + task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeCleanupThread", [this]{ run(); }); + task_handle->schedule(); } +ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread() +{ + storage.context.getSchedulePool().removeTask(task_handle); +} void ReplicatedMergeTreeCleanupThread::run() { - setThreadName("ReplMTCleanup"); - const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000 + std::uniform_int_distribution(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng); - while (!storage.shutdown_called) + try { - try - { - iterate(); - } - catch (const zkutil::KeeperException & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); + iterate(); + } + catch (const zkutil::KeeperException & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); - if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) - break; - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } - - storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS); + if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) + return; + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); } - LOG_DEBUG(log, "Cleanup thread finished"); + task_handle->scheduleAfter(CLEANUP_SLEEP_MS); + } @@ -243,11 +242,4 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime); } - -ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread() -{ - if (thread.joinable()) - thread.join(); -} - } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index ccbb564fa96..b2812fffad4 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -25,10 +26,12 @@ public: ~ReplicatedMergeTreeCleanupThread(); + void schedule() { task_handle->schedule(); } + private: StorageReplicatedMergeTree & storage; Logger * log; - std::thread thread; + BackgroundSchedulePool::TaskHandle task_handle; pcg64 rng; void run(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index e366ab972b0..0ac0ba72ce8 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -21,34 +21,34 @@ ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageRe : storage(storage_), log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)")) { + task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreePartCheckThread", [this] { run(); }); + task_handle->schedule(); } +ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread() +{ + stop(); + storage.context.getSchedulePool().removeTask(task_handle); +} void ReplicatedMergeTreePartCheckThread::start() { std::lock_guard lock(start_stop_mutex); - - if (need_stop) - need_stop = false; - else - thread = std::thread([this] { run(); }); + need_stop = false; + task_handle->activate(); + task_handle->schedule(); } - void ReplicatedMergeTreePartCheckThread::stop() { + //based on discussion on https://github.com/yandex/ClickHouse/pull/1489#issuecomment-344756259 + //using the schedule pool there is no problem in case stop is called two time in row and the start multiple times + std::lock_guard lock(start_stop_mutex); - need_stop = true; - if (thread.joinable()) - { - wakeup_event.set(); - thread.join(); - need_stop = false; - } + task_handle->deactivate(); } - void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds) { std::lock_guard lock(parts_mutex); @@ -58,7 +58,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); parts_set.insert(name); - wakeup_event.set(); + task_handle->schedule(); } @@ -309,95 +309,83 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) void ReplicatedMergeTreePartCheckThread::run() { - setThreadName("ReplMTPartCheck"); + if (need_stop) + return; - while (!need_stop) + try { - try + time_t current_time = time(nullptr); + + /// Take part from the queue for verification. + PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated + time_t min_check_time = std::numeric_limits::max(); + { - time_t current_time = time(nullptr); - - /// Take part from the queue for verification. - PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated - time_t min_check_time = std::numeric_limits::max(); + std::lock_guard lock(parts_mutex); + if (parts_queue.empty()) { - std::lock_guard lock(parts_mutex); - - if (parts_queue.empty()) + if (!parts_set.empty()) { - if (!parts_set.empty()) + LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug."); + parts_set.clear(); + } + } + else + { + for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it) + { + if (it->second <= current_time) { - LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug."); - parts_set.clear(); + selected = it; + break; } - } - else - { - for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it) - { - if (it->second <= current_time) - { - selected = it; - break; - } - if (it->second < min_check_time) - min_check_time = it->second; - } - } - } - - if (selected == parts_queue.end()) - { - /// Poco::Event is triggered immediately if `signal` was before the `wait` call. - /// We can wait a little more than we need due to the use of the old `current_time`. - - if (min_check_time != std::numeric_limits::max() && min_check_time > current_time) - wakeup_event.tryWait(1000 * (min_check_time - current_time)); - else - wakeup_event.wait(); - - continue; - } - - checkPart(selected->first); - - if (need_stop) - break; - - /// Remove the part from check queue. - { - std::lock_guard lock(parts_mutex); - - if (parts_queue.empty()) - { - LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug."); - } - else - { - parts_set.erase(selected->first); - parts_queue.erase(selected); + if (it->second < min_check_time) + min_check_time = it->second; } } } - catch (const zkutil::KeeperException & e) + + if (selected == parts_queue.end()) + return; + + checkPart(selected->first); + + if (need_stop) + return; + + /// Remove the part from check queue. { - tryLogCurrentException(log, __PRETTY_FUNCTION__); + std::lock_guard lock(parts_mutex); - if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) - break; + if (parts_queue.empty()) + { + LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug."); + } + else + { + parts_set.erase(selected->first); + parts_queue.erase(selected); + } + } - wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS); - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS); - } + task_handle->schedule(); } + catch (const zkutil::KeeperException & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); - LOG_DEBUG(log, "Part check thread finished"); + if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) + return; + + task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); + } } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h index 0e980fdd689..a5b6932636c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h @@ -10,7 +10,7 @@ #include #include #include - +#include namespace DB { @@ -29,6 +29,7 @@ class ReplicatedMergeTreePartCheckThread { public: ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_); + ~ReplicatedMergeTreePartCheckThread(); /// Processing of the queue to be checked is done in the background thread, which you must first start. void start(); @@ -65,10 +66,7 @@ public: /// Get the number of parts in the queue for check. size_t size() const; - ~ReplicatedMergeTreePartCheckThread() - { - stop(); - } + private: void run(); @@ -91,11 +89,10 @@ private: mutable std::mutex parts_mutex; StringSet parts_set; PartsToCheckQueue parts_queue; - Poco::Event wakeup_event; std::mutex start_stop_mutex; std::atomic need_stop { false }; - std::thread thread; + BackgroundSchedulePool::TaskHandle task_handle; }; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 4bc406158fc..ffa2c2b5622 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -259,7 +259,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri } -bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event) +bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_task_handle) { std::lock_guard lock(pull_logs_to_queue_mutex); @@ -388,10 +388,10 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z } } - if (next_update_event) + if (next_update_task_handle) { - if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event)) - next_update_event->set(); + if (zookeeper->existsWatch(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_task_handle->getWatchCallback())) + next_update_task_handle->schedule(); } return !log_entries.empty(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 59f4efa017a..a24c0e5d8b5 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -7,6 +7,7 @@ #include #include +#include namespace DB @@ -156,10 +157,10 @@ public: bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. - * If next_update_event != nullptr, will call this event when new entries appear in the log. + * If next_update_task_handle != nullptr, will schedule this task when new entries appear in the log. * Returns true if new entries have been. */ - bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event); + bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_task_handle); /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). * And also wait for the completion of their execution, if they are now being executed. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 3fe59ea940f..8e4433167fd 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -28,6 +28,10 @@ namespace ErrorCodes extern const int REPLICA_IS_ALREADY_ACTIVE; } +namespace +{ + constexpr auto retry_period_ms = 10 * 1000; +} /// Used to check whether it's us who set node `is_active`, or not. static String generateActiveNodeIdentifier() @@ -35,134 +39,140 @@ static String generateActiveNodeIdentifier() return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed()); } - ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_) : storage(storage_), log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")), - active_node_identifier(generateActiveNodeIdentifier()), - thread([this] { run(); }) + active_node_identifier(generateActiveNodeIdentifier()) { -} - - -void ReplicatedMergeTreeRestartingThread::run() -{ - constexpr auto retry_period_ms = 10 * 1000; - - /// The frequency of checking expiration of session in ZK. - Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000; + check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000; /// Periodicity of checking lag of replica. if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) check_period_ms = storage.data.settings.check_delay_period * 1000; - setThreadName("ReplMTRestart"); + storage.queue_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::queueUpdatingThread", [this]{ storage.queueUpdatingThread(); }); + storage.queue_updating_task_handle->deactivate(); - bool first_time = true; /// Activate replica for the first time. - time_t prev_time_of_check_delay = 0; + task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeRestartingThread", [this]{ run(); }); + task_handle->schedule(); +} - /// Starts the replica when the server starts/creates a table. Restart the replica when session expires with ZK. - while (!need_stop) +ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread() +{ + storage.context.getSchedulePool().removeTask(task_handle); + completeShutdown(); + storage.context.getSchedulePool().removeTask(storage.queue_updating_task_handle); +} + +void ReplicatedMergeTreeRestartingThread::run() +{ + if (need_stop) + return; + + try { - try + if (first_time || storage.getZooKeeper()->expired()) { - if (first_time || storage.getZooKeeper()->expired()) + startup_completed = false; + + if (first_time) { - if (first_time) + LOG_DEBUG(log, "Activating replica."); + } + else + { + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); + + bool old_val = false; + if (storage.is_readonly.compare_exchange_strong(old_val, true)) + CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); + + partialShutdown(); + } + + if (!startup_completed) + { + try { - LOG_DEBUG(log, "Activating replica."); + storage.setZooKeeper(storage.context.getZooKeeper()); } - else + catch (const zkutil::KeeperException & e) { - LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); - - bool old_val = false; - if (storage.is_readonly.compare_exchange_strong(old_val, true)) - CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); - - partialShutdown(); - } - - while (!need_stop) - { - try - { - storage.setZooKeeper(storage.context.getZooKeeper()); - } - catch (const zkutil::KeeperException & e) - { - /// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again. - tryLogCurrentException(log, __PRETTY_FUNCTION__); - - if (first_time) - storage.startup_event.set(); - wakeup_event.tryWait(retry_period_ms); - continue; - } - - if (!need_stop && !tryStartup()) - { - if (first_time) - storage.startup_event.set(); - wakeup_event.tryWait(retry_period_ms); - continue; - } + /// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again. + tryLogCurrentException(log, __PRETTY_FUNCTION__); if (first_time) storage.startup_event.set(); - break; + task_handle->scheduleAfter(retry_period_ms); + return; } - if (need_stop) - break; - - bool old_val = true; - if (storage.is_readonly.compare_exchange_strong(old_val, false)) - CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); - - first_time = false; - } - - time_t current_time = time(nullptr); - if (current_time >= prev_time_of_check_delay + static_cast(storage.data.settings.check_delay_period)) - { - /// Find out lag of replicas. - time_t absolute_delay = 0; - time_t relative_delay = 0; - - storage.getReplicaDelays(absolute_delay, relative_delay); - - if (absolute_delay) - LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << "."); - - prev_time_of_check_delay = current_time; - - /// We give up leadership if the relative lag is greater than threshold. - if (storage.is_leader - && relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) + if (!need_stop && !tryStartup()) { - LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" - << storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership."); - - ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); - - storage.exitLeaderElection(); - /// NOTE: enterLeaderElection() can throw if node creation in ZK fails. - /// This is bad because we can end up without a leader on any replica. - /// In this case we rely on the fact that the session will expire and we will reconnect. - storage.enterLeaderElection(); + if (first_time) + storage.startup_event.set(); + task_handle->scheduleAfter(retry_period_ms); + return; } + + if (first_time) + storage.startup_event.set(); + + startup_completed = true; + } + + if (need_stop) + return; + + bool old_val = true; + if (storage.is_readonly.compare_exchange_strong(old_val, false)) + CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); + + first_time = false; + } + + time_t current_time = time(nullptr); + if (current_time >= prev_time_of_check_delay + static_cast(storage.data.settings.check_delay_period)) + { + /// Find out lag of replicas. + time_t absolute_delay = 0; + time_t relative_delay = 0; + + storage.getReplicaDelays(absolute_delay, relative_delay); + + if (absolute_delay) + LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << "."); + + prev_time_of_check_delay = current_time; + + /// We give up leadership if the relative lag is greater than threshold. + if (storage.is_leader + && relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) + { + LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" + << storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership."); + + ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); + + storage.exitLeaderElection(); + /// NOTE: enterLeaderElection() can throw if node creation in ZK fails. + /// This is bad because we can end up without a leader on any replica. + /// In this case we rely on the fact that the session will expire and we will reconnect. + storage.enterLeaderElection(); } } - catch (...) - { - storage.startup_event.set(); - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } - - wakeup_event.tryWait(check_period_ms); + } + catch (...) + { + storage.startup_event.set(); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } + task_handle->scheduleAfter(check_period_ms); +} + +void ReplicatedMergeTreeRestartingThread::completeShutdown() +{ try { storage.data_parts_exchange_endpoint_holder->cancelForever(); @@ -182,8 +192,6 @@ void ReplicatedMergeTreeRestartingThread::run() { tryLogCurrentException(log, __PRETTY_FUNCTION__); } - - LOG_DEBUG(log, "Restarting thread finished"); } @@ -204,7 +212,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.shutdown_called = false; storage.shutdown_event.reset(); - storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage); + storage.queue_updating_task_handle->activate(); + storage.queue_updating_task_handle->schedule(); storage.part_check_thread.start(); storage.alter_thread = std::make_unique(storage); storage.cleanup_thread = std::make_unique(storage); @@ -348,18 +357,14 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.shutdown_called = true; storage.shutdown_event.set(); - storage.merge_selecting_event.set(); - storage.queue_updating_event->set(); storage.alter_query_event->set(); - storage.cleanup_thread_event.set(); storage.replica_is_active_node = nullptr; LOG_TRACE(log, "Waiting for threads to finish"); storage.exitLeaderElection(); - if (storage.queue_updating_thread.joinable()) - storage.queue_updating_thread.join(); + storage.queue_updating_task_handle->deactivate(); storage.cleanup_thread.reset(); storage.alter_thread.reset(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 4feff1b0443..2b53d25a884 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -22,16 +23,12 @@ class ReplicatedMergeTreeRestartingThread { public: ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_); - - ~ReplicatedMergeTreeRestartingThread() - { - if (thread.joinable()) - thread.join(); - } + ~ReplicatedMergeTreeRestartingThread(); void wakeup() { wakeup_event.set(); + task_handle->schedule(); } Poco::Event & getWakeupEvent() @@ -42,7 +39,7 @@ public: void stop() { need_stop = true; - wakeup(); + wakeup_event.set(); } private: @@ -54,9 +51,14 @@ private: /// The random data we wrote into `/replicas/me/is_active`. String active_node_identifier; - std::thread thread; + BackgroundSchedulePool::TaskHandle task_handle; + Int64 check_period_ms; /// The frequency of checking expiration of session in ZK. + bool first_time = true; /// Activate replica for the first time. + time_t prev_time_of_check_delay = 0; + bool startup_completed = false; void run(); + void completeShutdown(); /// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper. bool tryStartup(); /// Returns false if ZooKeeper is not available. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 921147f721e..093fa4e81d8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -108,6 +108,26 @@ namespace ErrorCodes static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000; static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; +template struct CachedMergingPredicate; + +class ReplicatedMergeTreeMergeSelectingThread +{ +public: + + ReplicatedMergeTreeMergeSelectingThread(StorageReplicatedMergeTree* storage_); + void clearState(); + + bool deduplicate; + std::chrono::steady_clock::time_point now; + std::function can_merge; + +private: + + StorageReplicatedMergeTree* storage; + std::function uncached_merging_predicate; + std::function(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> merging_predicate_args_to_key; + std::unique_ptr> > cached_merging_predicate; +}; /** There are three places for each part, where it should be * 1. In the RAM, MergeTreeData::data_parts, all_data_parts. @@ -219,6 +239,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (context.hasZooKeeper()) current_zookeeper = context.getZooKeeper(); + merge_sel_state.reset(new ReplicatedMergeTreeMergeSelectingThread(this)); + merge_selecting_task_handle = context_.getSchedulePool().addTask("StorageReplicatedMergeTree::mergeSelectingThread", [this] { mergeSelectingThread(); }); + bool skip_sanity_checks = false; if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data")) @@ -1012,9 +1035,9 @@ String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataP } -void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event) +void StorageReplicatedMergeTree::pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_task_handle) { - if (queue.pullLogsToQueue(getZooKeeper(), next_update_event)) + if (queue.pullLogsToQueue(getZooKeeper(), next_update_task_handle)) { if (queue_task_handle) queue_task_handle->wake(); @@ -1274,7 +1297,7 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. * This is not a problem, because in this case the merge will remain in the queue, and we will try again. */ - merge_selecting_event.set(); + merge_selecting_task_handle->schedule(); ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); write_part_log({}); @@ -1590,40 +1613,35 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry & void StorageReplicatedMergeTree::queueUpdatingThread() { - setThreadName("ReplMTQueueUpd"); + //most probably this check is not relevant + if (shutdown_called) + return; - bool update_in_progress = false; - while (!shutdown_called) + if (!queue_update_in_progress) { - if (!update_in_progress) - { - last_queue_update_start_time.store(time(nullptr)); - update_in_progress = true; - } - try - { - pullLogsToQueue(queue_updating_event); - last_queue_update_finish_time.store(time(nullptr)); - update_in_progress = false; - queue_updating_event->wait(); - } - catch (const zkutil::KeeperException & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - - if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) - break; - else - queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS); - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS); - } + last_queue_update_start_time.store(time(nullptr)); + queue_update_in_progress = true; } + try + { + pullLogsToQueue(queue_updating_task_handle); + last_queue_update_finish_time.store(time(nullptr)); + queue_update_in_progress = false; + } + catch (const zkutil::KeeperException & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); - LOG_DEBUG(log, "Queue updating thread finished"); + if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED) + return; + + queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); + } } @@ -1761,7 +1779,7 @@ namespace return true; } - +} /// If any of the parts is already going to be merged into a larger one, do not agree to merge it. bool partsWillNotBeMergedOrDisabled(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, @@ -1864,94 +1882,67 @@ namespace template constexpr CachedMergingPredicate::clock::duration CachedMergingPredicate::Expiration::min_delay; template constexpr CachedMergingPredicate::clock::duration CachedMergingPredicate::Expiration::max_delay; template constexpr double CachedMergingPredicate::Expiration::exponent_base; -} - void StorageReplicatedMergeTree::mergeSelectingThread() { - setThreadName("ReplMTMergeSel"); - LOG_DEBUG(log, "Merge selecting thread started"); + if (!is_leader) + return; - bool deduplicate = false; /// TODO: read deduplicate option from table config + bool success = false; - auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + try { - return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data); - }; + std::lock_guard merge_selecting_lock(merge_selecting_mutex); - auto merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) - { - return std::make_pair(left->name, right->name); - }; - - CachedMergingPredicate> cached_merging_predicate; - - /// Will be updated below. - std::chrono::steady_clock::time_point now; - - auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) - { - return partsWillNotBeMergedOrDisabled(left, right, queue) - && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); - }; - - while (is_leader) - { - bool success = false; - - try + /// You need to load new entries into the queue before you select parts to merge. + /// (so we know which parts are already going to be merged). + /// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges. + if (merge_selecting_logs_pulling_is_required) { - std::lock_guard merge_selecting_lock(merge_selecting_mutex); - - /// You need to load new entries into the queue before you select parts to merge. - /// (so we know which parts are already going to be merged). - /// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges. - if (merge_selecting_logs_pulling_is_required) - { - pullLogsToQueue(); - merge_selecting_logs_pulling_is_required = false; - } - - /// If many merges is already queued, then will queue only small enough merges. - /// Otherwise merge queue could be filled with only large merges, - /// and in the same time, many small parts could be created and won't be merged. - size_t merges_queued = queue.countMerges(); - - if (merges_queued >= data.settings.max_replicated_merges_in_queue) - { - LOG_TRACE(log, "Number of queued merges (" << merges_queued - << ") is greater than max_replicated_merges_in_queue (" - << data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge."); - } - else - { - MergeTreeDataMerger::FuturePart future_merged_part; - - size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued); - - now = std::chrono::steady_clock::now(); - - if (max_parts_size_for_merge > 0 - && merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge)) - { - merge_selecting_logs_pulling_is_required = true; - success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate); - } - } - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); + pullLogsToQueue(); + merge_selecting_logs_pulling_is_required = false; } - if (!is_leader) - break; + /// If many merges is already queued, then will queue only small enough merges. + /// Otherwise merge queue could be filled with only large merges, + /// and in the same time, many small parts could be created and won't be merged. + size_t merges_queued = queue.countMerges(); - if (!success) - merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS); + if (merges_queued >= data.settings.max_replicated_merges_in_queue) + { + LOG_TRACE(log, "Number of queued merges (" << merges_queued + << ") is greater than max_replicated_merges_in_queue (" + << data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge."); + } + else + { + MergeTreeDataMerger::FuturePart future_merged_part; + + size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued); + + merge_sel_state->now = std::chrono::steady_clock::now(); + + if (max_parts_size_for_merge > 0 + && merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, merge_sel_state->can_merge)) + { + merge_selecting_logs_pulling_is_required = true; + success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, merge_sel_state->deduplicate); + } + } + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); } - LOG_DEBUG(log, "Merge selecting thread finished"); + if (!is_leader) + return; + + if (!success) + merge_selecting_task_handle->scheduleAfter(MERGE_SELECTING_SLEEP_MS); + else + merge_selecting_task_handle->schedule(); + } @@ -2057,12 +2048,15 @@ void StorageReplicatedMergeTree::enterLeaderElection() LOG_INFO(log, "Became leader"); is_leader = true; - merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + merge_sel_state->clearState(); + merge_selecting_task_handle->activate(); + merge_selecting_task_handle->schedule(); }; try { leader_election = std::make_shared( + context.getSchedulePool(), zookeeper_path + "/leader_election", *current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. @@ -2091,8 +2085,7 @@ void StorageReplicatedMergeTree::exitLeaderElection() LOG_INFO(log, "Stopped being leader"); is_leader = false; - merge_selecting_event.set(); - merge_selecting_thread.join(); + merge_selecting_task_handle->deactivate(); } /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one @@ -2271,7 +2264,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin { LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. - cleanup_thread_event.set(); + cleanup_thread->schedule(); return false; } @@ -2366,7 +2359,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin if (quorum) updateQuorum(part_name); - merge_selecting_event.set(); + merge_selecting_task_handle->schedule(); for (const auto & replaced_part : replaced_parts) { @@ -2452,6 +2445,8 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree() { tryLogCurrentException(__PRETTY_FUNCTION__); } + + context.getSchedulePool().removeTask(merge_selecting_task_handle); } @@ -3803,4 +3798,37 @@ ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAdd return res; } +ReplicatedMergeTreeMergeSelectingThread::ReplicatedMergeTreeMergeSelectingThread(StorageReplicatedMergeTree* storage_) : + storage(storage_) +{ + clearState(); +} + +void ReplicatedMergeTreeMergeSelectingThread::clearState() +{ + deduplicate = false; /// TODO: read deduplicate option from table config + + uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + { + return canMergePartsAccordingToZooKeeperInfo(left, right, storage->getZooKeeper(), storage->zookeeper_path, storage->data); + }; + + merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + { + return std::make_pair(left->name, right->name); + }; + + cached_merging_predicate.reset(new CachedMergingPredicate>()); + + /// Will be updated below. + + now = std::chrono::steady_clock::time_point(); + + can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) + { + return partsWillNotBeMergedOrDisabled(left, right, storage->queue) + && cached_merging_predicate->get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); + }; +} + } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 0cb6dbb004c..c6152553bbe 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -22,11 +22,14 @@ #include #include #include +#include namespace DB { +class ReplicatedMergeTreeMergeSelectingThread; + /** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper. * * ZooKeeper is used for the following things: @@ -186,6 +189,7 @@ private: friend class ReplicatedMergeTreeRestartingThread; friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; + friend class ReplicatedMergeTreeMergeSelectingThread; using LogEntry = ReplicatedMergeTreeLogEntry; using LogEntryPtr = LogEntry::Ptr; @@ -253,16 +257,19 @@ private: /// Threads. - /// A thread that keeps track of the updates in the logs of all replicas and loads them into the queue. - std::thread queue_updating_thread; - zkutil::EventPtr queue_updating_event = std::make_shared(); + /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. + bool queue_update_in_progress = false; + BackgroundSchedulePool::TaskHandle queue_updating_task_handle; /// A task that performs actions from the queue. BackgroundProcessingPool::TaskHandle queue_task_handle; - /// A thread that selects parts to merge. - std::thread merge_selecting_thread; - Poco::Event merge_selecting_event; + /// A task that selects parts to merge. + BackgroundSchedulePool::TaskHandle merge_selecting_task_handle; + + /// State for merge selecting thread + std::unique_ptr merge_sel_state; + /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. std::mutex merge_selecting_mutex; /// If true then new entries might added to the queue, so we must pull logs before selecting parts for merge. @@ -271,8 +278,6 @@ private: /// A thread that removes old parts, log entries, and blocks. std::unique_ptr cleanup_thread; - /// Is used to wakeup cleanup_thread - Poco::Event cleanup_thread_event; /// A thread that processes reconnection to ZooKeeper when the session expires. std::unique_ptr restarting_thread; @@ -288,8 +293,6 @@ private: Logger * log; - /// Initialization. - /** Creates the minimum set of nodes in ZooKeeper. */ void createTableIfNotExists(); @@ -342,9 +345,9 @@ private: /// Running jobs from the queue. /** Copies the new entries from the logs of all replicas to the queue of this replica. - * If next_update_event != nullptr, calls this event when new entries appear in the log. + * If next_update_task_handle != nullptr, schedules this task when new entries appear in the log. */ - void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr); + void pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_task_handle = nullptr); /** Execute the action from the queue. Throws an exception if something is wrong. * Returns whether or not it succeeds. If it did not work, write it to the end of the queue.