Deleted settings

This commit is contained in:
Nikita Mikhaylov 2022-04-20 13:35:13 +00:00
parent 4818b63979
commit 36bdee0499
7 changed files with 141 additions and 64 deletions

View File

@ -1132,22 +1132,46 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_move_pool_size"))
{
auto new_pool_size = config->getInt("background_move_pool_size", 8);
auto new_pool_size = config->getInt("background_move_pool_size");
global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_fetches_pool_size"))
{
auto new_pool_size = config->getInt("background_fetches_pool_size", 8);
auto new_pool_size = config->getInt("background_fetches_pool_size");
global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_common_pool_size"))
{
auto new_pool_size = config->getInt("background_common_pool_size", 8);
auto new_pool_size = config->getInt("background_common_pool_size");
global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (config->has("background_buffer_flush_schedule_pool_size"))
{
auto new_pool_size = config->getUInt64("background_buffer_flush_schedule_pool_size");
global_context->getBufferFlushSchedulePool().increaseThreadsCount(new_pool_size);
}
if (config->has("background_schedule_pool_size"))
{
auto new_pool_size = config->getUInt64("background_schedule_pool_size");
global_context->getSchedulePool().increaseThreadsCount(new_pool_size);
}
if (config->has("background_message_broker_schedule_pool_size"))
{
auto new_pool_size = config->getUInt64("background_message_broker_schedule_pool_size");
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(new_pool_size);
}
if (config->has("background_distributed_schedule_pool_size"))
{
auto new_pool_size = config->getUInt64("background_distributed_schedule_pool_size");
global_context->getDistributedSchedulePool().increaseThreadsCount(new_pool_size);
}
if (!initial_loading)
{
/// We do not load ZooKeeper configuration on the first config loading

View File

@ -155,7 +155,7 @@ public:
/** Looks like std::thread but allocates threads in GlobalThreadPool.
* Also holds ThreadStatus for ClickHouse.
*/
class ThreadFromGlobalPool
class ThreadFromGlobalPool : boost::noncopyable
{
public:
ThreadFromGlobalPool() = default;

View File

@ -149,13 +149,12 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_)
: size(size_)
, tasks_metric(tasks_metric_)
: tasks_metric(tasks_metric_)
, thread_name(thread_name_)
{
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);
threads.resize(size);
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
@ -163,6 +162,23 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
}
void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
{
const size_t old_threads_count = threads.size();
if (new_threads_count < old_threads_count)
{
LOG_WARNING(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name),
"Tried to increase the number of threads but the new threads count ({}) is not greater than old one ({})", new_threads_count, old_threads_count);
return;
}
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPool([this] { threadFunction(); });
}
BackgroundSchedulePool::~BackgroundSchedulePool()
{
try

View File

@ -48,7 +48,9 @@ public:
TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
size_t getNumberOfThreads() const { return size; }
/// As for MergeTreeBackgroundExecutor we refuse to implement tasks eviction, because it will
/// be error prone. We support only increasing number of threads at runtime.
void increaseThreadsCount(size_t new_threads_count);
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_);
@ -66,8 +68,6 @@ private:
/// Remove task, that was scheduled with delay, from schedule.
void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
/// Number for worker threads.
const size_t size;
std::atomic<bool> shutdown {false};
Threads threads;
Poco::NotificationQueue queue;

View File

@ -114,30 +114,16 @@ void Settings::addProgramOptionAsMultitoken(boost::program_options::options_desc
name.data(), boost::program_options::value<Strings>()->multitoken()->composing()->notifier(on_program_option), field.getDescription())));
}
static std::array<std::string, 5> exceptional_settings =
{
"background_pool_size",
"background_merges_mutations_concurrency_ratio",
"background_move_pool_size",
"background_fetches_pool_size",
"background_common_pool_size",
};
void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path)
{
if (config.getBool("skip_check_for_incorrect_settings", false))
return;
auto is_exceptional = [](const String & setting)
{
return std::find(exceptional_settings.begin(), exceptional_settings.end(), setting) != exceptional_settings.end();
};
Settings settings;
for (auto setting : settings.all())
{
const auto & name = setting.getName();
if (config.has(name) && !is_exceptional(name))
if (config.has(name))
{
throw Exception(fmt::format("A setting '{}' appeared at top level in config {}."
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."

View File

@ -89,15 +89,6 @@ class IColumn;
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(UInt64, background_pool_size, 16, "Number of threads to perform merges and mutations in background. Only has meaning at server startup. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(Float, background_merges_mutations_concurrency_ratio, 2, "Ratio between a number of how many operations could be processed and a number threads to process them. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_fetches_pool_size, 8, "Deprecated. Number of threads performing background fetches for replicated tables. This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_common_pool_size, 8, "Number of threads for some lightweight tasks for replicated tables (like cleaning old parts etc.). This is not a user level setting and has to be defined at top level config. Left here only for backward compatibility.", 0) \
M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ and FileLog engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
@ -609,6 +600,15 @@ class IColumn;
MAKE_OBSOLETE(M, UInt64, partial_merge_join_optimizations, 0) \
MAKE_OBSOLETE(M, MaxThreads, max_alter_threads, 0) \
MAKE_OBSOLETE(M, Bool, allow_experimental_projection_optimization, true) \
MAKE_OBSOLETE(M, UInt64, background_buffer_flush_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, UInt64, background_pool_size, 16) \
MAKE_OBSOLETE(M, Float, background_merges_mutations_concurrency_ratio, 2) \
MAKE_OBSOLETE(M, UInt64, background_move_pool_size, 8) \
MAKE_OBSOLETE(M, UInt64, background_fetches_pool_size, 8) \
MAKE_OBSOLETE(M, UInt64, background_common_pool_size, 8) \
MAKE_OBSOLETE(M, UInt64, background_schedule_pool_size, 128) \
MAKE_OBSOLETE(M, UInt64, background_message_broker_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, UInt64, background_distributed_schedule_pool_size, 16) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -1770,10 +1770,19 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
{
size_t background_buffer_flush_schedule_pool_size = 16;
if (getConfigRef().has("background_buffer_flush_schedule_pool_size"))
background_buffer_flush_schedule_pool_size = getConfigRef().getUInt64("background_buffer_flush_schedule_pool_size");
else if (getConfigRef().has("profiles.default.background_buffer_flush_schedule_pool_size"))
background_buffer_flush_schedule_pool_size = getConfigRef().getUInt64("profiles.default.background_buffer_flush_schedule_pool_size");
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_buffer_flush_schedule_pool_size,
background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
"BgBufSchPool");
}
return *shared->buffer_flush_schedule_pool;
}
@ -1812,10 +1821,19 @@ BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
{
size_t background_schedule_pool_size = 128;
if (getConfigRef().has("background_schedule_pool_size"))
background_schedule_pool_size = getConfigRef().getUInt64("background_schedule_pool_size");
else if (getConfigRef().has("profiles.default.background_schedule_pool_size"))
background_schedule_pool_size = getConfigRef().getUInt64("profiles.default.background_schedule_pool_size");
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_schedule_pool_size,
background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
"BgSchPool");
}
return *shared->schedule_pool;
}
@ -1823,10 +1841,19 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
{
size_t background_distributed_schedule_pool_size = 16;
if (getConfigRef().has("background_distributed_schedule_pool_size"))
background_distributed_schedule_pool_size = getConfigRef().getUInt64("background_distributed_schedule_pool_size");
else if (getConfigRef().has("profiles.default.background_distributed_schedule_pool_size"))
background_distributed_schedule_pool_size = getConfigRef().getUInt64("profiles.default.background_distributed_schedule_pool_size");
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_distributed_schedule_pool_size,
background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgDistSchPool");
}
return *shared->distributed_schedule_pool;
}
@ -1834,10 +1861,19 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
{
size_t background_message_broker_schedule_pool_size = 16;
if (getConfigRef().has("background_message_broker_schedule_pool_size"))
background_message_broker_schedule_pool_size = getConfigRef().getUInt64("background_message_broker_schedule_pool_size");
else if (getConfigRef().has("profiles.default.background_message_broker_schedule_pool_size"))
background_message_broker_schedule_pool_size = getConfigRef().getUInt64("profiles.default.background_message_broker_schedule_pool_size");
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_message_broker_schedule_pool_size,
background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
"BgMBSchPool");
}
return *shared->message_broker_schedule_pool;
}
@ -3217,60 +3253,75 @@ void Context::initializeBackgroundExecutorsIfNeeded()
if (shared->are_background_executors_initialized)
return;
const auto & config = getConfigRef();
/// This is done for backward compatibility. All these setting could be specified in the top level config or in the default profile's config.
/// TODO: Remove these settings from default profile's config in the future.
size_t background_pool_size_top_level = getConfigRef().has("background_pool_size") ? getConfigRef().getUInt("background_pool_size", 16) : 0;
size_t concurrency_ratio_top_level = getConfigRef().has("background_merges_mutations_concurrency_ratio") ? getConfigRef().getUInt("background_merges_mutations_concurrency_ratio", 2) : 0;
size_t background_move_pool_size_top_level = getConfigRef().has("background_move_pool_size") ? getConfigRef().getUInt("background_move_pool_size", 8) : 0;
size_t background_fetches_pool_size_top_level = getConfigRef().has("background_fetches_pool_size") ? getConfigRef().getUInt("background_fetches_pool_size", 8) : 0;
size_t background_common_pool_size_top_level = getConfigRef().has("background_common_pool_size") ? getConfigRef().getUInt("background_common_pool_size", 8) : 0;
size_t background_pool_size = 16;
if (config.has("background_pool_size"))
background_pool_size = config.getUInt64("background_pool_size");
else if (config.has("profiles.default.background_pool_size"))
background_pool_size = config.getUInt64("profiles.default.background_pool_size");
size_t background_merges_mutations_concurrency_ratio = 2;
if (config.has("background_merges_mutations_concurrency_ratio"))
background_merges_mutations_concurrency_ratio = config.getUInt64("background_merges_mutations_concurrency_ratio");
else if (config.has("profiles.default.background_pool_size"))
background_merges_mutations_concurrency_ratio = config.getUInt64("profiles.default.background_merges_mutations_concurrency_ratio");
const size_t merges_and_mutations_threads_count = std::max<size_t>(background_pool_size_top_level, getSettingsRef().background_pool_size);
const size_t max_merges_and_mutations = merges_and_mutations_threads_count *
std::max<size_t>(concurrency_ratio_top_level, getSettingsRef().background_merges_mutations_concurrency_ratio);
size_t background_move_pool_size = 8;
if (config.has("background_move_pool_size"))
background_move_pool_size = config.getUInt64("background_move_pool_size");
else if (config.has("profiles.default.background_move_pool_size"))
background_move_pool_size = config.getUInt64("profiles.default.background_move_pool_size");
size_t background_fetches_pool_size = 8;
if (config.has("background_fetches_pool_size"))
background_fetches_pool_size = config.getUInt64("background_fetches_pool_size");
else if (config.has("profiles.default.background_fetches_pool_size"))
background_fetches_pool_size = config.getUInt64("profiles.default.background_fetches_pool_size");
size_t background_common_pool_size = 8;
if (config.has("background_common_pool_size"))
background_common_pool_size = config.getUInt64("background_common_pool_size");
else if (config.has("profiles.default.background_common_pool_size"))
background_common_pool_size = config.getUInt64("profiles.default.background_common_pool_size");
/// With this executor we can execute more tasks than threads we have
shared->merge_mutate_executor = MergeMutateBackgroundExecutor::create
(
"MergeMutate",
/*max_threads_count*/merges_and_mutations_threads_count,
/*max_tasks_count*/max_merges_and_mutations,
/*max_threads_count*/background_pool_size,
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}",
merges_and_mutations_threads_count, max_merges_and_mutations);
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio);
const size_t moves_threads_count = std::max<size_t>(background_move_pool_size_top_level, getSettingsRef().background_move_pool_size);
shared->moves_executor = OrdinaryBackgroundExecutor::create
(
"Move",
moves_threads_count,
moves_threads_count,
background_move_pool_size,
background_move_pool_size,
CurrentMetrics::BackgroundMovePoolTask
);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", moves_threads_count, moves_threads_count);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size);
const size_t fetches_threads_count = std::max<size_t>(background_fetches_pool_size_top_level, getSettingsRef().background_fetches_pool_size);
shared->fetch_executor = OrdinaryBackgroundExecutor::create
(
"Fetch",
fetches_threads_count,
fetches_threads_count,
background_fetches_pool_size,
background_fetches_pool_size,
CurrentMetrics::BackgroundFetchesPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", fetches_threads_count, fetches_threads_count);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size);
const size_t common_threads_count = std::max<size_t>(background_common_pool_size_top_level, getSettingsRef().background_common_pool_size);
shared->common_executor = OrdinaryBackgroundExecutor::create
(
"Common",
common_threads_count,
common_threads_count,
background_common_pool_size,
background_common_pool_size,
CurrentMetrics::BackgroundCommonPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", common_threads_count, common_threads_count);
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", background_common_pool_size, background_common_pool_size);
shared->are_background_executors_initialized = true;
}