From 5d11118cc9edcd98a4cff73acc6901c521f5827e Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 14 Apr 2020 21:12:08 +0300 Subject: [PATCH] Use thread pool (background_distributed_schedule_pool_size) for distributed sends After #8756 the problem with 1 thread for each (distributed table, disk) for distributed sends became even worse (since there can be multiple disks), so use predefined thread pool for this tasks, that can be controlled with background_distributed_schedule_pool_size knob. --- src/Core/Settings.h | 1 + src/Interpreters/Context.cpp | 10 ++++ src/Interpreters/Context.h | 1 + src/Storages/Distributed/DirectoryMonitor.cpp | 47 +++++++++++++------ src/Storages/Distributed/DirectoryMonitor.h | 12 +++-- .../DistributedBlockOutputStream.cpp | 7 ++- src/Storages/StorageDistributed.cpp | 11 +++-- src/Storages/StorageDistributed.h | 2 +- 8 files changed, 67 insertions(+), 24 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9cd33962f6b..cffdd4a66e4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -82,6 +82,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ + M(SettingUInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ \ M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ M(SettingMilliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5d8a0e53276..647c3fb8020 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -321,6 +321,7 @@ struct ContextShared std::optional background_pool; /// The thread pool for the background work performed by the tables. std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) + std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -418,6 +419,7 @@ struct ContextShared background_pool.reset(); background_move_pool.reset(); schedule_pool.reset(); + distributed_schedule_pool.reset(); ddl_worker.reset(); /// Stop trace collector if any @@ -1348,6 +1350,14 @@ BackgroundSchedulePool & Context::getSchedulePool() return *shared->schedule_pool; } +BackgroundSchedulePool & Context::getDistributedSchedulePool() +{ + auto lock = getLock(); + if (!shared->distributed_schedule_pool) + shared->distributed_schedule_pool.emplace(settings.background_distributed_schedule_pool_size); + return *shared->distributed_schedule_pool; +} + void Context::setDDLWorker(std::unique_ptr ddl_worker) { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 11937d4fc89..1f81cdbc58b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -475,6 +475,7 @@ public: BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); + BackgroundSchedulePool & getDistributedSchedulePool(); void setDDLWorker(std::unique_ptr ddl_worker); DDLWorker & getDDLWorker() const; diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index d21c2eb0d4c..e937d5e8a90 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -78,7 +77,7 @@ namespace StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( - StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_) + StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_) /// It's important to initialize members before `thread` to avoid race. : storage(storage_) , pool(std::move(pool_)) @@ -92,7 +91,10 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()} , log{&Logger::get(getLoggerName())} , monitor_blocker(monitor_blocker_) + , bg_pool(bg_pool_) { + task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); }); + task_handle->activateAndSchedule(); } @@ -102,7 +104,7 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() { quit = true; cond.notify_one(); - thread.join(); + task_handle->deactivate(); } } @@ -121,7 +123,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() { quit = true; cond.notify_one(); - thread.join(); + task_handle->deactivate(); } Poco::File(path).remove(true); @@ -130,16 +132,11 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() void StorageDistributedDirectoryMonitor::run() { - setThreadName("DistrDirMonitor"); - std::unique_lock lock{mutex}; - const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); }; - - while (!quit_requested()) + while (!quit) { - auto do_sleep = true; - + bool do_sleep = true; if (!monitor_blocker.isCancelled()) { try @@ -161,15 +158,25 @@ void StorageDistributedDirectoryMonitor::run() LOG_DEBUG(log, "Skipping send data over distributed table."); } - if (do_sleep) - cond.wait_for(lock, sleep_time, quit_requested); - const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) { error_count /= 2; last_decrease_time = now; } + + if (do_sleep) + break; + } + + if (!quit) + { + /// If there is no error, then it will be scheduled by the DistributedBlockOutputStream, + /// so this is just in case, hence it is distributed_directory_monitor_max_sleep_time_ms + if (error_count) + task_handle->scheduleAfter(sleep_time.count()); + else + task_handle->scheduleAfter(max_sleep_time.count()); } } @@ -580,6 +587,13 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con return std::make_shared(file_name); } +bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms) +{ + if (quit) + return false; + return task_handle->scheduleAfter(ms); +} + void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) { std::unordered_set file_indices_to_skip; @@ -708,8 +722,13 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path) { std::lock_guard lock{mutex}; + + task_handle->deactivate(); + path = new_path; current_batch_file_path = path + "current_batch.txt"; + + task_handle->activateAndSchedule(); } } diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 475a3bc7bc6..61d51e5acfd 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -1,10 +1,9 @@ #pragma once #include -#include +#include #include -#include #include #include #include @@ -20,7 +19,7 @@ class StorageDistributedDirectoryMonitor { public: StorageDistributedDirectoryMonitor( - StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_); + StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_); ~StorageDistributedDirectoryMonitor(); @@ -33,6 +32,9 @@ public: void shutdownAndDropAllData(); static BlockInputStreamPtr createStreamFromFile(const String & file_name); + + /// For scheduling via DistributedBlockOutputStream + bool scheduleAfter(size_t ms); private: void run(); bool processFiles(); @@ -67,7 +69,9 @@ private: std::condition_variable cond; Logger * log; ActionBlocker & monitor_blocker; - ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; + + BackgroundSchedulePool & bg_pool; + BackgroundSchedulePoolTaskHolder task_handle; /// Read insert query and insert settings for backward compatible. static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index b0695ccad1b..3a341f9f43c 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -589,8 +589,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: const std::string path(disk + data_path + dir_name + '/'); /// ensure shard subdirectory creation and notify storage - if (Poco::File(path).createDirectory()) - storage.requireDirectoryMonitor(disk, dir_name); + Poco::File(path).createDirectory(); + auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); const auto & file_name = toString(storage.file_names_increment.get()) + ".bin"; const auto & block_file_path = path + file_name; @@ -632,6 +632,9 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: stream.writePrefix(); stream.write(block); stream.writeSuffix(); + + auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms; + directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds()); } if (link(first_file_tmp_path.data(), block_file_path.data())) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index b453b73c4cb..14e7eea5c96 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -577,15 +577,20 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk) } -void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name) +StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name) { const std::string path(disk + relative_data_path + name); const std::string key(disk + name); std::lock_guard lock(cluster_nodes_mutex); auto & node_data = cluster_nodes_data[key]; - node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); - node_data.directory_monitor = std::make_unique(*this, path, node_data.conneciton_pool, monitors_blocker); + if (!node_data.directory_monitor) + { + node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); + node_data.directory_monitor = std::make_unique( + *this, path, node_data.conneciton_pool, monitors_blocker, global_context.getDistributedSchedulePool()); + } + return *node_data.directory_monitor; } size_t StorageDistributed::getShardCount() const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 81c6b54a63e..2c5e321fc5f 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -109,7 +109,7 @@ public: /// create directory monitors for each existing subdirectory void createDirectoryMonitors(const std::string & disk); /// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name - void requireDirectoryMonitor(const std::string & disk, const std::string & name); + StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & disk, const std::string & name); void flushClusterNodesAllData();