Merge pull request #47656 from ClickHouse/background_pool_limit_metric

Add background pools size metrics
This commit is contained in:
Sergei Trifonov 2023-03-17 11:49:53 +01:00 committed by GitHub
commit 1a84203c7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 50 additions and 11 deletions

View File

@ -11,13 +11,21 @@
M(ReplicatedSend, "Number of data parts being sent to replicas") \
M(ReplicatedChecks, "Number of data parts checking for consistency") \
M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \
M(BackgroundMergesAndMutationsPoolSize, "Limit on number of active merges and mutations in an associated background pool") \
M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \
M(BackgroundFetchesPoolSize, "Limit on number of simultaneous fetches in an associated background pool") \
M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \
M(BackgroundCommonPoolSize, "Limit on number of tasks in an associated background pool") \
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundMovePoolSize, "Limit on number of tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundSchedulePoolSize, "Limit on number of tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundBufferFlushSchedulePoolSize, "Limit on number of tasks in BackgroundBufferFlushSchedulePool") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundDistributedSchedulePoolSize, "Limit on number of tasks in BackgroundDistributedSchedulePool") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(BackgroundMessageBrokerSchedulePoolSize, "Limit on number of tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \

View File

@ -149,8 +149,9 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
}
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_)
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_)
: tasks_metric(tasks_metric_)
, size_metric(size_metric_, size_)
, thread_name(thread_name_)
{
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);
@ -177,6 +178,8 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
size_metric.changeTo(new_threads_count);
}

View File

@ -54,7 +54,7 @@ public:
void increaseThreadsCount(size_t new_threads_count);
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_);
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_);
~BackgroundSchedulePool();
private:
@ -91,6 +91,7 @@ private:
DelayedTasks delayed_tasks;
CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Increment size_metric;
std::string thread_name;
};

View File

@ -129,13 +129,21 @@ namespace CurrentMetrics
{
extern const Metric ContextLockWait;
extern const Metric BackgroundMovePoolTask;
extern const Metric BackgroundMovePoolSize;
extern const Metric BackgroundSchedulePoolTask;
extern const Metric BackgroundSchedulePoolSize;
extern const Metric BackgroundBufferFlushSchedulePoolTask;
extern const Metric BackgroundBufferFlushSchedulePoolSize;
extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric BackgroundDistributedSchedulePoolSize;
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric BackgroundMessageBrokerSchedulePoolSize;
extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
extern const Metric BackgroundFetchesPoolTask;
extern const Metric BackgroundFetchesPoolSize;
extern const Metric BackgroundCommonPoolTask;
extern const Metric BackgroundCommonPoolSize;
}
namespace DB
@ -2175,6 +2183,7 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
CurrentMetrics::BackgroundBufferFlushSchedulePoolSize,
"BgBufSchPool");
}
@ -2226,6 +2235,7 @@ BackgroundSchedulePool & Context::getSchedulePool() const
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool");
}
@ -2246,6 +2256,7 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
CurrentMetrics::BackgroundDistributedSchedulePoolSize,
"BgDistSchPool");
}
@ -2266,6 +2277,7 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize,
"BgMBSchPool");
}
@ -3826,6 +3838,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
/*max_threads_count*/background_pool_size,
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize,
background_merges_mutations_scheduling_policy
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}",
@ -3836,7 +3849,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Move",
background_move_pool_size,
background_move_pool_size,
CurrentMetrics::BackgroundMovePoolTask
CurrentMetrics::BackgroundMovePoolTask,
CurrentMetrics::BackgroundMovePoolSize
);
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size);
@ -3845,7 +3859,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Fetch",
background_fetches_pool_size,
background_fetches_pool_size,
CurrentMetrics::BackgroundFetchesPoolTask
CurrentMetrics::BackgroundFetchesPoolTask,
CurrentMetrics::BackgroundFetchesPoolSize
);
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size);
@ -3854,7 +3869,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Common",
background_common_pool_size,
background_common_pool_size,
CurrentMetrics::BackgroundCommonPoolTask
CurrentMetrics::BackgroundCommonPoolTask,
CurrentMetrics::BackgroundCommonPoolSize
);
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", background_common_pool_size, background_common_pool_size);

View File

@ -59,6 +59,7 @@ void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t
for (size_t number = threads_count; number < new_threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); });
max_tasks_metric.changeTo(2 * new_max_tasks_count); // pending + active
max_tasks_count.store(new_max_tasks_count, std::memory_order_relaxed);
threads_count = new_threads_count;
}

View File

@ -13,6 +13,7 @@
#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
@ -247,11 +248,13 @@ public:
String name_,
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_)
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_)
: name(name_)
, threads_count(threads_count_)
, max_tasks_count(max_tasks_count_)
, metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
{
if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
@ -272,9 +275,10 @@ public:
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_,
std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_)
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_, max_tasks_metric_)
{
pending.updatePolicy(policy);
}
@ -311,6 +315,7 @@ private:
size_t threads_count TSA_GUARDED_BY(mutex) = 0;
std::atomic<size_t> max_tasks_count = 0;
CurrentMetrics::Metric metric;
CurrentMetrics::Increment max_tasks_metric;
void routine(TaskRuntimeDataPtr item);

View File

@ -15,6 +15,7 @@ using namespace DB;
namespace CurrentMetrics
{
extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
}
std::random_device device;
@ -102,7 +103,8 @@ TEST(Executor, Simple)
"GTest",
1, // threads
100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
String schedule; // mutex is not required because we have a single worker
@ -144,7 +146,8 @@ TEST(Executor, RemoveTasks)
"GTest",
tasks_kinds,
tasks_kinds * batch,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
for (size_t i = 0; i < batch; ++i)
@ -184,7 +187,8 @@ TEST(Executor, RemoveTasksStress)
"GTest",
tasks_kinds,
tasks_kinds * batch * (schedulers_count + removers_count),
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
std::barrier barrier(schedulers_count + removers_count);
@ -234,7 +238,8 @@ TEST(Executor, UpdatePolicy)
"GTest",
1, // threads
100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
);
String schedule; // mutex is not required because we have a single worker