This commit is contained in:
Nikita Mikhaylov 2022-04-19 15:01:41 +00:00
parent 224f4dc620
commit 31ccb9c1c3
7 changed files with 98 additions and 9 deletions

View File

@ -1119,6 +1119,35 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config->has("keeper_server")) if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config); global_context->updateKeeperConfiguration(*config);
/// Reload the number of threads for global pools.
/// Note: If you specified it in the top level config (not it config of default profile)
/// then ClickHouse will use it exactly.
/// This is done for backward compatibility.
if (global_context->areBackgroundExecutorsInitialized() && (config->has("background_pool_size") || config->has("background_merges_mutations_concurrency_ratio")))
{
auto new_pool_size = config->getInt("background_pool_size", 16);
auto new_ratio = config->getInt("background_merges_mutations_concurrency_ratio", 2);
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size * new_ratio);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_move_pool_size"))
{
auto new_pool_size = config->getInt("background_move_pool_size", 8);
global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_fetches_pool_size"))
{
auto new_pool_size = config->getInt("background_fetches_pool_size", 8);
global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_common_pool_size"))
{
auto new_pool_size = config->getInt("background_common_pool_size", 8);
global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (!initial_loading) if (!initial_loading)
{ {
/// We do not load ZooKeeper configuration on the first config loading /// We do not load ZooKeeper configuration on the first config loading

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnMap.h> #include <Columns/ColumnMap.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <array>
#include <string.h> #include <string.h>
#include <boost/program_options/options_description.hpp> #include <boost/program_options/options_description.hpp>
@ -113,16 +114,30 @@ void Settings::addProgramOptionAsMultitoken(boost::program_options::options_desc
name.data(), boost::program_options::value<Strings>()->multitoken()->composing()->notifier(on_program_option), field.getDescription()))); name.data(), boost::program_options::value<Strings>()->multitoken()->composing()->notifier(on_program_option), field.getDescription())));
} }
static std::array<std::string, 5> exceptional_settings =
{
"background_pool_size",
"background_merges_mutations_concurrency_ratio",
"background_move_pool_size",
"background_fetches_pool_size",
"background_common_pool_size",
};
void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path) void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path)
{ {
if (config.getBool("skip_check_for_incorrect_settings", false)) if (config.getBool("skip_check_for_incorrect_settings", false))
return; return;
auto is_exceptional = [](const String & setting)
{
return std::find(exceptional_settings.begin(), exceptional_settings.end(), setting) != exceptional_settings.end();
};
Settings settings; Settings settings;
for (auto setting : settings.all()) for (auto setting : settings.all())
{ {
const auto & name = setting.getName(); const auto & name = setting.getName();
if (config.has(name)) if (config.has(name) && !is_exceptional(name))
{ {
throw Exception(fmt::format("A setting '{}' appeared at top level in config {}." throw Exception(fmt::format("A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile." " But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -90,11 +90,11 @@ class IColumn;
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \ M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(UInt64, background_pool_size, 16, "Number of threads to perform merges and mutations in background. Only has meaning at server startup.", 0) \ M(UInt64, background_pool_size, 16, "Number of threads to perform merges and mutations in background. Only has meaning at server startup. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(Float, background_merges_mutations_concurrency_ratio, 2, "Ratio between a number of how many operations could be processed and a number threads to process them. Only has meaning at server startup.", 0) \ M(Float, background_merges_mutations_concurrency_ratio, 2, "Ratio between a number of how many operations could be processed and a number threads to process them. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_fetches_pool_size, 8, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \ M(UInt64, background_fetches_pool_size, 8, "Deprecated. Number of threads performing background fetches for replicated tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_common_pool_size, 8, "Number of threads for some lightweight tasks for replicated tables (like cleaning old parts etc.). Only has meaning at server startup.", 0) \ M(UInt64, background_common_pool_size, 8, "Number of threads for some lightweight tasks for replicated tables (like cleaning old parts etc.). This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \ M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \

View File

@ -272,7 +272,7 @@ struct ContextSharedPart
bool shutdown_called = false; bool shutdown_called = false;
/// Has background executors for MergeTree tables been initialized? /// Has background executors for MergeTree tables been initialized?
bool is_background_executors_initialized = false; bool are_background_executors_initialized = false;
Stopwatch uptime_watch; Stopwatch uptime_watch;
@ -3214,7 +3214,7 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
void Context::initializeBackgroundExecutorsIfNeeded() void Context::initializeBackgroundExecutorsIfNeeded()
{ {
auto lock = getLock(); auto lock = getLock();
if (shared->is_background_executors_initialized) if (shared->are_background_executors_initialized)
return; return;
const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio; const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio;
@ -3264,9 +3264,14 @@ void Context::initializeBackgroundExecutorsIfNeeded()
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}",
getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size); getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size);
shared->is_background_executors_initialized = true; shared->are_background_executors_initialized = true;
} }
bool Context::areBackgroundExecutorsInitialized()
{
auto lock = getLock();
return shared->are_background_executors_initialized;
}
MergeMutateBackgroundExecutorPtr Context::getMergeMutateExecutor() const MergeMutateBackgroundExecutorPtr Context::getMergeMutateExecutor() const
{ {

View File

@ -926,6 +926,7 @@ public:
/// Background executors related methods /// Background executors related methods
void initializeBackgroundExecutorsIfNeeded(); void initializeBackgroundExecutorsIfNeeded();
bool areBackgroundExecutorsInitialized();
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const; MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
OrdinaryBackgroundExecutorPtr getMovesExecutor() const; OrdinaryBackgroundExecutorPtr getMovesExecutor() const;

View File

@ -28,6 +28,40 @@ void MergeTreeBackgroundExecutor<Queue>::wait()
pool.wait(); pool.wait();
} }
template <class Queue>
void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t new_threads_count, size_t new_max_tasks_count)
{
std::lock_guard lock(mutex);
/// Do not throw any exceptions from global pool. Just log a warning and silently return.
if (new_threads_count <= threads_count)
{
LOG_WARNING(log, "Loaded new threads count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_threads_count, threads_count);
return;
}
if (new_max_tasks_count <= max_tasks_count)
{
LOG_WARNING(log, "Loaded new max tasks count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_max_tasks_count, max_tasks_count);
return;
}
LOG_INFO(log, "Loaded new threads count ({}) and max tasks count ({}) for {}Executor", new_threads_count, new_max_tasks_count, name);
pending.setCapacity(new_max_tasks_count);
active.set_capacity(new_max_tasks_count);
pool.setMaxThreads(std::max(1UL, new_threads_count));
pool.setMaxFreeThreads(std::max(1UL, new_threads_count));
pool.setQueueSize(std::max(1UL, new_threads_count));
for (size_t number = threads_count; number < new_threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); });
max_tasks_count = new_max_tasks_count;
threads_count = new_threads_count;
}
template <class Queue> template <class Queue>
bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task) bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task)

View File

@ -188,6 +188,11 @@ public:
wait(); wait();
} }
/// Handler for hot-reloading
/// Supports only increasing the number of threads and tasks, because
/// implemeting tasks eviction will definitely be too error-prone and buggy.
void increaseThreadsAndMaxTasksCount(size_t new_threads_count, size_t new_max_tasks_count);
bool trySchedule(ExecutableTaskPtr task); bool trySchedule(ExecutableTaskPtr task);
void removeTasksCorrespondingToStorage(StorageID id); void removeTasksCorrespondingToStorage(StorageID id);
void wait(); void wait();