diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 786fff9b0e4..b12bd90fc1e 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1119,6 +1119,35 @@ int Server::main(const std::vector & /*args*/) if (config->has("keeper_server")) global_context->updateKeeperConfiguration(*config); + /// Reload the number of threads for global pools. + /// Note: If you specified it in the top level config (not it config of default profile) + /// then ClickHouse will use it exactly. + /// This is done for backward compatibility. + if (global_context->areBackgroundExecutorsInitialized() && (config->has("background_pool_size") || config->has("background_merges_mutations_concurrency_ratio"))) + { + auto new_pool_size = config->getInt("background_pool_size", 16); + auto new_ratio = config->getInt("background_merges_mutations_concurrency_ratio", 2); + global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size * new_ratio); + } + + if (global_context->areBackgroundExecutorsInitialized() && config->has("background_move_pool_size")) + { + auto new_pool_size = config->getInt("background_move_pool_size", 8); + global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); + } + + if (global_context->areBackgroundExecutorsInitialized() && config->has("background_fetches_pool_size")) + { + auto new_pool_size = config->getInt("background_fetches_pool_size", 8); + global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); + } + + if (global_context->areBackgroundExecutorsInitialized() && config->has("background_common_pool_size")) + { + auto new_pool_size = config->getInt("background_common_pool_size", 8); + global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size); + } + if (!initial_loading) { /// We do not load ZooKeeper configuration on the first config loading diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 411e73bdf1a..be970ae868c 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -113,16 +114,30 @@ void Settings::addProgramOptionAsMultitoken(boost::program_options::options_desc name.data(), boost::program_options::value()->multitoken()->composing()->notifier(on_program_option), field.getDescription()))); } +static std::array exceptional_settings = +{ + "background_pool_size", + "background_merges_mutations_concurrency_ratio", + "background_move_pool_size", + "background_fetches_pool_size", + "background_common_pool_size", +}; + void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path) { if (config.getBool("skip_check_for_incorrect_settings", false)) return; + auto is_exceptional = [](const String & setting) + { + return std::find(exceptional_settings.begin(), exceptional_settings.end(), setting) != exceptional_settings.end(); + }; + Settings settings; for (auto setting : settings.all()) { const auto & name = setting.getName(); - if (config.has(name)) + if (config.has(name) && !is_exceptional(name)) { throw Exception(fmt::format("A setting '{}' appeared at top level in config {}." " But it is user-level setting that should be located in users.xml inside section for specific profile." diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2cc18497e0b..00e2c4dde96 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -90,11 +90,11 @@ class IColumn; M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \ - M(UInt64, background_pool_size, 16, "Number of threads to perform merges and mutations in background. Only has meaning at server startup.", 0) \ - M(Float, background_merges_mutations_concurrency_ratio, 2, "Ratio between a number of how many operations could be processed and a number threads to process them. Only has meaning at server startup.", 0) \ - M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ - M(UInt64, background_fetches_pool_size, 8, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \ - M(UInt64, background_common_pool_size, 8, "Number of threads for some lightweight tasks for replicated tables (like cleaning old parts etc.). Only has meaning at server startup.", 0) \ + M(UInt64, background_pool_size, 16, "Number of threads to perform merges and mutations in background. Only has meaning at server startup. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \ + M(Float, background_merges_mutations_concurrency_ratio, 2, "Ratio between a number of how many operations could be processed and a number threads to process them. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \ + M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \ + M(UInt64, background_fetches_pool_size, 8, "Deprecated. Number of threads performing background fetches for replicated tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \ + M(UInt64, background_common_pool_size, 8, "Number of threads for some lightweight tasks for replicated tables (like cleaning old parts etc.). This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \ M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \ M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 0ad9b72b963..0f000946c4c 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -272,7 +272,7 @@ struct ContextSharedPart bool shutdown_called = false; /// Has background executors for MergeTree tables been initialized? - bool is_background_executors_initialized = false; + bool are_background_executors_initialized = false; Stopwatch uptime_watch; @@ -3214,7 +3214,7 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptris_background_executors_initialized) + if (shared->are_background_executors_initialized) return; const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio; @@ -3264,9 +3264,14 @@ void Context::initializeBackgroundExecutorsIfNeeded() LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size); - shared->is_background_executors_initialized = true; + shared->are_background_executors_initialized = true; } +bool Context::areBackgroundExecutorsInitialized() +{ + auto lock = getLock(); + return shared->are_background_executors_initialized; +} MergeMutateBackgroundExecutorPtr Context::getMergeMutateExecutor() const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index b53e3945188..d6f8951c17f 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -926,6 +926,7 @@ public: /// Background executors related methods void initializeBackgroundExecutorsIfNeeded(); + bool areBackgroundExecutorsInitialized(); MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const; OrdinaryBackgroundExecutorPtr getMovesExecutor() const; diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index 499a8fbbaa6..457cc323f4b 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -28,6 +28,40 @@ void MergeTreeBackgroundExecutor::wait() pool.wait(); } +template +void MergeTreeBackgroundExecutor::increaseThreadsAndMaxTasksCount(size_t new_threads_count, size_t new_max_tasks_count) +{ + std::lock_guard lock(mutex); + + /// Do not throw any exceptions from global pool. Just log a warning and silently return. + if (new_threads_count <= threads_count) + { + LOG_WARNING(log, "Loaded new threads count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_threads_count, threads_count); + return; + } + + if (new_max_tasks_count <= max_tasks_count) + { + LOG_WARNING(log, "Loaded new max tasks count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_max_tasks_count, max_tasks_count); + return; + } + + LOG_INFO(log, "Loaded new threads count ({}) and max tasks count ({}) for {}Executor", new_threads_count, new_max_tasks_count, name); + + pending.setCapacity(new_max_tasks_count); + active.set_capacity(new_max_tasks_count); + + pool.setMaxThreads(std::max(1UL, new_threads_count)); + pool.setMaxFreeThreads(std::max(1UL, new_threads_count)); + pool.setQueueSize(std::max(1UL, new_threads_count)); + + for (size_t number = threads_count; number < new_threads_count; ++number) + pool.scheduleOrThrowOnError([this] { threadFunction(); }); + + max_tasks_count = new_max_tasks_count; + threads_count = new_threads_count; +} + template bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task) diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index 5cfa7b6ed7c..21b0afac59b 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -188,6 +188,11 @@ public: wait(); } + /// Handler for hot-reloading + /// Supports only increasing the number of threads and tasks, because + /// implemeting tasks eviction will definitely be too error-prone and buggy. + void increaseThreadsAndMaxTasksCount(size_t new_threads_count, size_t new_max_tasks_count); + bool trySchedule(ExecutableTaskPtr task); void removeTasksCorrespondingToStorage(StorageID id); void wait();