diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9cd33962f6b..cffdd4a66e4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -82,6 +82,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ + M(SettingUInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ \ M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ M(SettingMilliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5d8a0e53276..647c3fb8020 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -321,6 +321,7 @@ struct ContextShared std::optional background_pool; /// The thread pool for the background work performed by the tables. std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) + std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -418,6 +419,7 @@ struct ContextShared background_pool.reset(); background_move_pool.reset(); schedule_pool.reset(); + distributed_schedule_pool.reset(); ddl_worker.reset(); /// Stop trace collector if any @@ -1348,6 +1350,14 @@ BackgroundSchedulePool & Context::getSchedulePool() return *shared->schedule_pool; } +BackgroundSchedulePool & Context::getDistributedSchedulePool() +{ + auto lock = getLock(); + if (!shared->distributed_schedule_pool) + shared->distributed_schedule_pool.emplace(settings.background_distributed_schedule_pool_size); + return *shared->distributed_schedule_pool; +} + void Context::setDDLWorker(std::unique_ptr ddl_worker) { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 11937d4fc89..1f81cdbc58b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -475,6 +475,7 @@ public: BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); + BackgroundSchedulePool & getDistributedSchedulePool(); void setDDLWorker(std::unique_ptr ddl_worker); DDLWorker & getDDLWorker() const; diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index d21c2eb0d4c..e937d5e8a90 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -78,7 +77,7 @@ namespace StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( - StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_) + StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_) /// It's important to initialize members before `thread` to avoid race. : storage(storage_) , pool(std::move(pool_)) @@ -92,7 +91,10 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()} , log{&Logger::get(getLoggerName())} , monitor_blocker(monitor_blocker_) + , bg_pool(bg_pool_) { + task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); }); + task_handle->activateAndSchedule(); } @@ -102,7 +104,7 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() { quit = true; cond.notify_one(); - thread.join(); + task_handle->deactivate(); } } @@ -121,7 +123,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() { quit = true; cond.notify_one(); - thread.join(); + task_handle->deactivate(); } Poco::File(path).remove(true); @@ -130,16 +132,11 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() void StorageDistributedDirectoryMonitor::run() { - setThreadName("DistrDirMonitor"); - std::unique_lock lock{mutex}; - const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); }; - - while (!quit_requested()) + while (!quit) { - auto do_sleep = true; - + bool do_sleep = true; if (!monitor_blocker.isCancelled()) { try @@ -161,15 +158,25 @@ void StorageDistributedDirectoryMonitor::run() LOG_DEBUG(log, "Skipping send data over distributed table."); } - if (do_sleep) - cond.wait_for(lock, sleep_time, quit_requested); - const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) { error_count /= 2; last_decrease_time = now; } + + if (do_sleep) + break; + } + + if (!quit) + { + /// If there is no error, then it will be scheduled by the DistributedBlockOutputStream, + /// so this is just in case, hence it is distributed_directory_monitor_max_sleep_time_ms + if (error_count) + task_handle->scheduleAfter(sleep_time.count()); + else + task_handle->scheduleAfter(max_sleep_time.count()); } } @@ -580,6 +587,13 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con return std::make_shared(file_name); } +bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms) +{ + if (quit) + return false; + return task_handle->scheduleAfter(ms); +} + void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) { std::unordered_set file_indices_to_skip; @@ -708,8 +722,13 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path) { std::lock_guard lock{mutex}; + + task_handle->deactivate(); + path = new_path; current_batch_file_path = path + "current_batch.txt"; + + task_handle->activateAndSchedule(); } } diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 475a3bc7bc6..61d51e5acfd 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -1,10 +1,9 @@ #pragma once #include -#include +#include #include -#include #include #include #include @@ -20,7 +19,7 @@ class StorageDistributedDirectoryMonitor { public: StorageDistributedDirectoryMonitor( - StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_); + StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_); ~StorageDistributedDirectoryMonitor(); @@ -33,6 +32,9 @@ public: void shutdownAndDropAllData(); static BlockInputStreamPtr createStreamFromFile(const String & file_name); + + /// For scheduling via DistributedBlockOutputStream + bool scheduleAfter(size_t ms); private: void run(); bool processFiles(); @@ -67,7 +69,9 @@ private: std::condition_variable cond; Logger * log; ActionBlocker & monitor_blocker; - ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; + + BackgroundSchedulePool & bg_pool; + BackgroundSchedulePoolTaskHolder task_handle; /// Read insert query and insert settings for backward compatible. static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index b0695ccad1b..3a341f9f43c 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -589,8 +589,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: const std::string path(disk + data_path + dir_name + '/'); /// ensure shard subdirectory creation and notify storage - if (Poco::File(path).createDirectory()) - storage.requireDirectoryMonitor(disk, dir_name); + Poco::File(path).createDirectory(); + auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); const auto & file_name = toString(storage.file_names_increment.get()) + ".bin"; const auto & block_file_path = path + file_name; @@ -632,6 +632,9 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: stream.writePrefix(); stream.write(block); stream.writeSuffix(); + + auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms; + directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds()); } if (link(first_file_tmp_path.data(), block_file_path.data())) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index b453b73c4cb..14e7eea5c96 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -577,15 +577,20 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk) } -void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name) +StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name) { const std::string path(disk + relative_data_path + name); const std::string key(disk + name); std::lock_guard lock(cluster_nodes_mutex); auto & node_data = cluster_nodes_data[key]; - node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); - node_data.directory_monitor = std::make_unique(*this, path, node_data.conneciton_pool, monitors_blocker); + if (!node_data.directory_monitor) + { + node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); + node_data.directory_monitor = std::make_unique( + *this, path, node_data.conneciton_pool, monitors_blocker, global_context.getDistributedSchedulePool()); + } + return *node_data.directory_monitor; } size_t StorageDistributed::getShardCount() const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 81c6b54a63e..2c5e321fc5f 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -109,7 +109,7 @@ public: /// create directory monitors for each existing subdirectory void createDirectoryMonitors(const std::string & disk); /// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name - void requireDirectoryMonitor(const std::string & disk, const std::string & name); + StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & disk, const std::string & name); void flushClusterNodesAllData();