Merge pull request #47880 from azat/threadpool-introspection

ThreadPool metrics introspection
This commit is contained in:
Alexey Milovidov 2023-03-30 01:27:31 +03:00 committed by GitHub
commit e982fb9f1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 441 additions and 87 deletions

View File

@ -34,6 +34,7 @@
#include <Common/Config/configReadClient.h>
#include <Common/TerminalSize.h>
#include <Common/StudentTTest.h>
#include <Common/CurrentMetrics.h>
#include <filesystem>
@ -43,6 +44,12 @@ namespace fs = std::filesystem;
* The tool emulates a case with fixed amount of simultaneously executing queries.
*/
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB
{
@ -103,7 +110,7 @@ public:
settings(settings_),
shared_context(Context::createShared()),
global_context(Context::createGlobal(shared_context.get())),
pool(concurrency)
pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency)
{
const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
size_t connections_cnt = std::max(ports_.size(), hosts_.size());

View File

@ -6,6 +6,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTFunction.h>
@ -19,6 +20,12 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB
{
@ -192,7 +199,7 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
{
/// Fetch partitions list from a shard
{
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
ThreadPool thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]()

View File

@ -20,10 +20,19 @@
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace CurrentMetrics
{
extern const Metric BackupsThreads;
extern const Metric BackupsThreadsActive;
extern const Metric RestoreThreads;
extern const Metric RestoreThreadsActive;
}
namespace DB
{
@ -153,8 +162,8 @@ namespace
BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_)
: backups_thread_pool(num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
, restores_thread_pool(num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
: backups_thread_pool(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
, restores_thread_pool(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
, log(&Poco::Logger::get("BackupsWorker"))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)

View File

@ -72,6 +72,64 @@
M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \
M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \
M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \
M(BackupsThreads, "Number of threads in the thread pool for BACKUP.") \
M(BackupsThreadsActive, "Number of threads in thread pool for BACKUP running a task.") \
M(RestoreThreads, "Number of threads in the thread pool for RESTORE.") \
M(RestoreThreadsActive, "Number of threads in the thread pool for RESTORE running a task.") \
M(IOThreads, "Number of threads in the IO thread pool.") \
M(IOThreadsActive, "Number of threads in the IO thread pool running a task.") \
M(ThreadPoolRemoteFSReaderThreads, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool.") \
M(ThreadPoolRemoteFSReaderThreadsActive, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool running a task.") \
M(ThreadPoolFSReaderThreads, "Number of threads in the thread pool for local_filesystem_read_method=threadpool.") \
M(ThreadPoolFSReaderThreadsActive, "Number of threads in the thread pool for local_filesystem_read_method=threadpool running a task.") \
M(BackupsIOThreads, "Number of threads in the BackupsIO thread pool.") \
M(BackupsIOThreadsActive, "Number of threads in the BackupsIO thread pool running a task.") \
M(DiskObjectStorageAsyncThreads, "Number of threads in the async thread pool for DiskObjectStorage.") \
M(DiskObjectStorageAsyncThreadsActive, "Number of threads in the async thread pool for DiskObjectStorage running a task.") \
M(StorageHiveThreads, "Number of threads in the StorageHive thread pool.") \
M(StorageHiveThreadsActive, "Number of threads in the StorageHive thread pool running a task.") \
M(TablesLoaderThreads, "Number of threads in the tables loader thread pool.") \
M(TablesLoaderThreadsActive, "Number of threads in the tables loader thread pool running a task.") \
M(DatabaseOrdinaryThreads, "Number of threads in the Ordinary database thread pool.") \
M(DatabaseOrdinaryThreadsActive, "Number of threads in the Ordinary database thread pool running a task.") \
M(DatabaseOnDiskThreads, "Number of threads in the DatabaseOnDisk thread pool.") \
M(DatabaseOnDiskThreadsActive, "Number of threads in the DatabaseOnDisk thread pool running a task.") \
M(DatabaseCatalogThreads, "Number of threads in the DatabaseCatalog thread pool.") \
M(DatabaseCatalogThreadsActive, "Number of threads in the DatabaseCatalog thread pool running a task.") \
M(DestroyAggregatesThreads, "Number of threads in the thread pool for destroy aggregate states.") \
M(DestroyAggregatesThreadsActive, "Number of threads in the thread pool for destroy aggregate states running a task.") \
M(HashedDictionaryThreads, "Number of threads in the HashedDictionary thread pool.") \
M(HashedDictionaryThreadsActive, "Number of threads in the HashedDictionary thread pool running a task.") \
M(CacheDictionaryThreads, "Number of threads in the CacheDictionary thread pool.") \
M(CacheDictionaryThreadsActive, "Number of threads in the CacheDictionary thread pool running a task.") \
M(ParallelFormattingOutputFormatThreads, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool.") \
M(ParallelFormattingOutputFormatThreadsActive, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool running a task.") \
M(ParallelParsingInputFormatThreads, "Number of threads in the ParallelParsingInputFormat thread pool.") \
M(ParallelParsingInputFormatThreadsActive, "Number of threads in the ParallelParsingInputFormat thread pool running a task.") \
M(MergeTreeBackgroundExecutorThreads, "Number of threads in the MergeTreeBackgroundExecutor thread pool.") \
M(MergeTreeBackgroundExecutorThreadsActive, "Number of threads in the MergeTreeBackgroundExecutor thread pool running a task.") \
M(AsynchronousInsertThreads, "Number of threads in the AsynchronousInsert thread pool.") \
M(AsynchronousInsertThreadsActive, "Number of threads in the AsynchronousInsert thread pool running a task.") \
M(StartupSystemTablesThreads, "Number of threads in the StartupSystemTables thread pool.") \
M(StartupSystemTablesThreadsActive, "Number of threads in the StartupSystemTables thread pool running a task.") \
M(AggregatorThreads, "Number of threads in the Aggregator thread pool.") \
M(AggregatorThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \
M(DDLWorkerThreads, "Number of threads in the DDLWorker thread pool for ON CLUSTER queries.") \
M(DDLWorkerThreadsActive, "Number of threads in the DDLWORKER thread pool for ON CLUSTER queries running a task.") \
M(StorageDistributedThreads, "Number of threads in the StorageDistributed thread pool.") \
M(StorageDistributedThreadsActive, "Number of threads in the StorageDistributed thread pool running a task.") \
M(StorageS3Threads, "Number of threads in the StorageS3 thread pool.") \
M(StorageS3ThreadsActive, "Number of threads in the StorageS3 thread pool running a task.") \
M(MergeTreePartsLoaderThreads, "Number of threads in the MergeTree parts loader thread pool.") \
M(MergeTreePartsLoaderThreadsActive, "Number of threads in the MergeTree parts loader thread pool running a task.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(SystemReplicasThreads, "Number of threads in the system.replicas thread pool.") \
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \

View File

@ -4,6 +4,7 @@
#include <cstdint>
#include <utility>
#include <atomic>
#include <cassert>
#include <base/types.h>
/** Allows to count number of simultaneously happening processes or current value of some metric.
@ -73,7 +74,10 @@ namespace CurrentMetrics
public:
explicit Increment(Metric metric, Value amount_ = 1)
: Increment(&values[metric], amount_) {}
: Increment(&values[metric], amount_)
{
assert(metric < CurrentMetrics::end());
}
~Increment()
{

View File

@ -26,28 +26,37 @@ namespace CurrentMetrics
{
extern const Metric GlobalThread;
extern const Metric GlobalThreadActive;
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl()
: ThreadPoolImpl(getNumberOfPhysicalCPUCores())
ThreadPoolImpl<Thread>::ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_)
: ThreadPoolImpl(metric_threads_, metric_active_threads_, getNumberOfPhysicalCPUCores())
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
: ThreadPoolImpl(max_threads_, max_threads_, max_threads_)
ThreadPoolImpl<Thread>::ThreadPoolImpl(
Metric metric_threads_,
Metric metric_active_threads_,
size_t max_threads_)
: ThreadPoolImpl(metric_threads_, metric_active_threads_, max_threads_, max_threads_, max_threads_)
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_)
: max_threads(max_threads_)
ThreadPoolImpl<Thread>::ThreadPoolImpl(
Metric metric_threads_,
Metric metric_active_threads_,
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
bool shutdown_on_exception_)
: metric_threads(metric_threads_)
, metric_active_threads(metric_active_threads_)
, max_threads(max_threads_)
, max_free_threads(std::min(max_free_threads_, max_threads))
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
@ -324,8 +333,7 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
DENY_ALLOCATIONS_IN_SCOPE;
CurrentMetrics::Increment metric_all_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread);
CurrentMetrics::Increment metric_pool_threads(metric_threads);
/// Remove this thread from `threads` and detach it, that must be done before exiting from this worker.
/// We can't wrap the following lambda function into `SCOPE_EXIT` because it requires `mutex` to be locked.
@ -383,8 +391,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
try
{
CurrentMetrics::Increment metric_active_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);
job();
@ -458,6 +465,22 @@ template class ThreadFromGlobalPoolImpl<true>;
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
GlobalThreadPool::GlobalThreadPool(
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
const bool shutdown_on_exception_)
: FreeThreadPool(
CurrentMetrics::GlobalThread,
CurrentMetrics::GlobalThreadActive,
max_threads_,
max_free_threads_,
queue_size_,
shutdown_on_exception_)
{
}
void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
if (the_instance)

View File

@ -16,6 +16,7 @@
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/CurrentMetrics.h>
#include <base/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.
@ -33,15 +34,25 @@ class ThreadPoolImpl
{
public:
using Job = std::function<void()>;
using Metric = CurrentMetrics::Metric;
/// Maximum number of threads is based on the number of physical cores.
ThreadPoolImpl();
ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_);
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t max_threads_);
explicit ThreadPoolImpl(
Metric metric_threads_,
Metric metric_active_threads_,
size_t max_threads_);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_ = true);
ThreadPoolImpl(
Metric metric_threads_,
Metric metric_active_threads_,
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
bool shutdown_on_exception_ = true);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If any thread was throw an exception, first exception will be rethrown from this method,
@ -96,6 +107,9 @@ private:
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
Metric metric_threads;
Metric metric_active_threads;
size_t max_threads;
size_t max_free_threads;
size_t queue_size;
@ -159,12 +173,11 @@ class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
{
static std::unique_ptr<GlobalThreadPool> the_instance;
GlobalThreadPool(size_t max_threads_, size_t max_free_threads_,
size_t queue_size_, const bool shutdown_on_exception_)
: FreeThreadPool(max_threads_, max_free_threads_, queue_size_,
shutdown_on_exception_)
{
}
GlobalThreadPool(
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
bool shutdown_on_exception_);
public:
static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000);

View File

@ -17,6 +17,7 @@
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
using Key = UInt64;
@ -28,6 +29,12 @@ using Map = HashMap<Key, Value>;
using MapTwoLevel = TwoLevelHashMap<Key, Value>;
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
struct SmallLock
{
std::atomic<int> locked {false};
@ -247,7 +254,7 @@ int main(int argc, char ** argv)
std::cerr << std::fixed << std::setprecision(2);
ThreadPool pool(num_threads);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads);
Source data(n);

View File

@ -17,6 +17,7 @@
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
using Key = UInt64;
@ -24,6 +25,12 @@ using Value = UInt64;
using Source = std::vector<Key>;
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
template <typename Map>
struct AggregateIndependent
{
@ -274,7 +281,7 @@ int main(int argc, char ** argv)
std::cerr << std::fixed << std::setprecision(2);
ThreadPool pool(num_threads);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads);
Source data(n);

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
int value = 0;
@ -14,6 +15,12 @@ static void f() { ++value; }
static void * g(void *) { f(); return {}; }
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB
{
namespace ErrorCodes
@ -65,7 +72,7 @@ int main(int argc, char ** argv)
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
@ -86,7 +93,7 @@ int main(int argc, char ** argv)
});
{
ThreadPool tp(1);
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1);
test(n, "Schedule job for Threadpool each iteration", [&tp]
{
@ -96,7 +103,7 @@ int main(int argc, char ** argv)
}
{
ThreadPool tp(128);
ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 128);
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{

View File

@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <gtest/gtest.h>
@ -7,6 +8,12 @@
*/
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
TEST(ThreadPool, ConcurrentWait)
{
auto worker = []
@ -18,14 +25,14 @@ TEST(ThreadPool, ConcurrentWait)
constexpr size_t num_threads = 4;
constexpr size_t num_jobs = 4;
ThreadPool pool(num_threads);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads);
for (size_t i = 0; i < num_jobs; ++i)
pool.scheduleOrThrowOnError(worker);
constexpr size_t num_waiting_threads = 4;
ThreadPool waiting_pool(num_waiting_threads);
ThreadPool waiting_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_waiting_threads);
for (size_t i = 0; i < num_waiting_threads; ++i)
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });

View File

@ -2,10 +2,17 @@
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <gtest/gtest.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
/// the job will be scheduled but never get executed.
@ -27,7 +34,7 @@ TEST(ThreadPool, GlobalFull1)
auto func = [&] { ++counter; while (counter != num_jobs) {} };
ThreadPool pool(num_jobs);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.scheduleOrThrowOnError(func);
@ -65,11 +72,11 @@ TEST(ThreadPool, GlobalFull2)
std::atomic<size_t> counter = 0;
auto func = [&] { ++counter; while (counter != capacity + 1) {} };
ThreadPool pool(capacity, 0, capacity);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.scheduleOrThrowOnError(func);
ThreadPool another_pool(1);
ThreadPool another_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1);
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;

View File

@ -1,10 +1,17 @@
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <gtest/gtest.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
TEST(ThreadPool, Loop)
{
std::atomic<int> res{0};
@ -12,7 +19,7 @@ TEST(ThreadPool, Loop)
for (size_t i = 0; i < 1000; ++i)
{
size_t threads = 16;
ThreadPool pool(threads);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, threads);
for (size_t j = 0; j < threads; ++j)
pool.scheduleOrThrowOnError([&] { ++res; });
pool.wait();

View File

@ -1,13 +1,20 @@
#include <iostream>
#include <stdexcept>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <gtest/gtest.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
static bool check()
{
ThreadPool pool(10);
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 10);
/// The throwing thread.
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });

View File

@ -17,14 +17,21 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/assert_cast.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <Common/assert_cast.h>
#include <filesystem>
#include <Common/filesystemHelpers.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DatabaseOnDiskThreads;
extern const Metric DatabaseOnDiskThreadsActive;
}
namespace DB
{
@ -620,7 +627,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
}
/// Read and parse metadata in parallel
ThreadPool pool;
ThreadPool pool(CurrentMetrics::DatabaseOnDiskThreads, CurrentMetrics::DatabaseOnDiskThreadsActive);
for (const auto & file : metadata_files)
{
pool.scheduleOrThrowOnError([&]()

View File

@ -25,9 +25,16 @@
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DatabaseOrdinaryThreads;
extern const Metric DatabaseOrdinaryThreadsActive;
}
namespace DB
{
@ -99,7 +106,7 @@ void DatabaseOrdinary::loadStoredObjects(
std::atomic<size_t> dictionaries_processed{0};
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
ThreadPool pool(CurrentMetrics::DatabaseOrdinaryThreads, CurrentMetrics::DatabaseOrdinaryThreadsActive);
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached

View File

@ -8,8 +8,15 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <numeric>
namespace CurrentMetrics
{
extern const Metric TablesLoaderThreads;
extern const Metric TablesLoaderThreadsActive;
}
namespace DB
{
@ -31,12 +38,13 @@ void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, Atomic
}
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, LoadingStrictnessLevel strictness_mode_)
: global_context(global_context_)
, databases(std::move(databases_))
, strictness_mode(strictness_mode_)
, referential_dependencies("ReferentialDeps")
, loading_dependencies("LoadingDeps")
, all_loading_dependencies("LoadingDeps")
: global_context(global_context_)
, databases(std::move(databases_))
, strictness_mode(strictness_mode_)
, referential_dependencies("ReferentialDeps")
, loading_dependencies("LoadingDeps")
, all_loading_dependencies("LoadingDeps")
, pool(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive)
{
metadata.default_database = global_context->getCurrentDatabase();
log = &Poco::Logger::get("TablesLoader");

View File

@ -2,8 +2,15 @@
#include <Dictionaries/CacheDictionaryUpdateQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
namespace CurrentMetrics
{
extern const Metric CacheDictionaryThreads;
extern const Metric CacheDictionaryThreadsActive;
}
namespace DB
{
@ -26,7 +33,7 @@ CacheDictionaryUpdateQueue<dictionary_key_type>::CacheDictionaryUpdateQueue(
, configuration(configuration_)
, update_func(std::move(update_func_))
, update_queue(configuration.max_update_queue_size)
, update_pool(configuration.max_threads_for_updates)
, update_pool(CurrentMetrics::CacheDictionaryThreads, CurrentMetrics::CacheDictionaryThreadsActive, configuration.max_threads_for_updates)
{
for (size_t i = 0; i < configuration.max_threads_for_updates; ++i)
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });

View File

@ -8,6 +8,7 @@
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Core/Defines.h>
@ -21,6 +22,11 @@
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
namespace CurrentMetrics
{
extern const Metric HashedDictionaryThreads;
extern const Metric HashedDictionaryThreadsActive;
}
namespace
{
@ -60,7 +66,7 @@ public:
explicit ParallelDictionaryLoader(HashedDictionary & dictionary_)
: dictionary(dictionary_)
, shards(dictionary.configuration.shards)
, pool(shards)
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, shards)
, shards_queues(shards)
{
UInt64 backlog = dictionary.configuration.shard_load_queue_backlog;
@ -213,7 +219,7 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
return;
size_t shards = std::max<size_t>(configuration.shards, 1);
ThreadPool pool(shards);
ThreadPool pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, shards);
size_t hash_tables_count = 0;
auto schedule_destroy = [&hash_tables_count, &pool](auto & container)

View File

@ -61,6 +61,8 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric Read;
extern const Metric ThreadPoolFSReaderThreads;
extern const Metric ThreadPoolFSReaderThreadsActive;
}
@ -85,7 +87,7 @@ static bool hasBugInPreadV2()
#endif
ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(pool_size, pool_size, queue_size_)
: pool(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_)
{
}

View File

@ -26,6 +26,8 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric RemoteRead;
extern const Metric ThreadPoolRemoteFSReaderThreads;
extern const Metric ThreadPoolRemoteFSReaderThreadsActive;
}
namespace DB
@ -60,7 +62,7 @@ IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t
ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_)
: pool(pool_size, pool_size, queue_size_)
: pool(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_)
{
}

View File

@ -10,6 +10,7 @@
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
@ -17,6 +18,13 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace CurrentMetrics
{
extern const Metric DiskObjectStorageAsyncThreads;
extern const Metric DiskObjectStorageAsyncThreadsActive;
}
namespace DB
{
@ -38,7 +46,8 @@ class AsyncThreadPoolExecutor : public Executor
public:
AsyncThreadPoolExecutor(const String & name_, int thread_pool_size)
: name(name_)
, pool(ThreadPool(thread_pool_size)) {}
, pool(CurrentMetrics::DiskObjectStorageAsyncThreads, CurrentMetrics::DiskObjectStorageAsyncThreadsActive, thread_pool_size)
{}
std::future<void> execute(std::function<void()> task) override
{

View File

@ -1,5 +1,12 @@
#include <IO/BackupsIOThreadPool.h>
#include "Core/Field.h"
#include <Common/CurrentMetrics.h>
#include <Core/Field.h>
namespace CurrentMetrics
{
extern const Metric BackupsIOThreads;
extern const Metric BackupsIOThreadsActive;
}
namespace DB
{
@ -18,7 +25,13 @@ void BackupsIOThreadPool::initialize(size_t max_threads, size_t max_free_threads
throw Exception(ErrorCodes::LOGICAL_ERROR, "The BackupsIO thread pool is initialized twice");
}
instance = std::make_unique<ThreadPool>(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/);
instance = std::make_unique<ThreadPool>(
CurrentMetrics::BackupsIOThreads,
CurrentMetrics::BackupsIOThreadsActive,
max_threads,
max_free_threads,
queue_size,
/* shutdown_on_exception= */ false);
}
ThreadPool & BackupsIOThreadPool::get()

View File

@ -1,5 +1,12 @@
#include <IO/IOThreadPool.h>
#include "Core/Field.h"
#include <Common/CurrentMetrics.h>
#include <Core/Field.h>
namespace CurrentMetrics
{
extern const Metric IOThreads;
extern const Metric IOThreadsActive;
}
namespace DB
{
@ -18,7 +25,13 @@ void IOThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_
throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is initialized twice");
}
instance = std::make_unique<ThreadPool>(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/);
instance = std::make_unique<ThreadPool>(
CurrentMetrics::IOThreads,
CurrentMetrics::IOThreadsActive,
max_threads,
max_free_threads,
queue_size,
/* shutdown_on_exception= */ false);
}
ThreadPool & IOThreadPool::get()

View File

@ -24,6 +24,7 @@
#include <Common/CacheBase.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
@ -59,6 +60,8 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForAggregation;
extern const Metric AggregatorThreads;
extern const Metric AggregatorThreadsActive;
}
namespace DB
@ -2397,7 +2400,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
std::unique_ptr<ThreadPool> thread_pool;
if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Make a custom threshold.
&& data_variants.isTwoLevel()) /// TODO Use the shared thread pool with the `merge` function.
thread_pool = std::make_unique<ThreadPool>(max_threads);
thread_pool = std::make_unique<ThreadPool>(CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, max_threads);
if (data_variants.without_key)
blocks.emplace_back(prepareBlockAndFillWithoutKey(
@ -2592,7 +2595,7 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
{
ThreadPool thread_pool{params.max_threads};
ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, params.max_threads};
AggregatedDataVariantsPtr & res = non_empty_data[0];
@ -3065,7 +3068,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
std::unique_ptr<ThreadPool> thread_pool;
if (max_threads > 1 && total_input_rows > 100000) /// TODO Make a custom threshold.
thread_pool = std::make_unique<ThreadPool>(max_threads);
thread_pool = std::make_unique<ThreadPool>(CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, max_threads);
for (const auto & bucket_blocks : bucket_to_blocks)
{

View File

@ -32,6 +32,8 @@
namespace CurrentMetrics
{
extern const Metric PendingAsyncInsert;
extern const Metric AsynchronousInsertThreads;
extern const Metric AsynchronousInsertThreadsActive;
}
namespace ProfileEvents
@ -130,7 +132,7 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo
: WithContext(context_)
, pool_size(pool_size_)
, queue_shards(pool_size)
, pool(pool_size)
, pool(CurrentMetrics::AsynchronousInsertThreads, CurrentMetrics::AsynchronousInsertThreadsActive, pool_size)
{
if (!pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero");

View File

@ -40,6 +40,12 @@
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DDLWorkerThreads;
extern const Metric DDLWorkerThreadsActive;
}
namespace DB
{
@ -85,7 +91,7 @@ DDLWorker::DDLWorker(
{
LOG_WARNING(log, "DDLWorker is configured to use multiple threads. "
"It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.");
worker_pool = std::make_unique<ThreadPool>(pool_size);
worker_pool = std::make_unique<ThreadPool>(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size);
}
queue_dir = zk_root_dir;
@ -1084,7 +1090,7 @@ void DDLWorker::runMainThread()
/// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
/// We create new thread pool to forget previous exceptions.
if (1 < pool_size)
worker_pool = std::make_unique<ThreadPool>(pool_size);
worker_pool = std::make_unique<ThreadPool>(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size);
/// Clear other in-memory state, like server just started.
current_tasks.clear();
last_skipped_entry_name.reset();
@ -1123,7 +1129,7 @@ void DDLWorker::runMainThread()
initialized = false;
/// Wait for pending async tasks
if (1 < pool_size)
worker_pool = std::make_unique<ThreadPool>(pool_size);
worker_pool = std::make_unique<ThreadPool>(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size);
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
}
else

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>

View File

@ -38,6 +38,8 @@
namespace CurrentMetrics
{
extern const Metric TablesToDropQueueSize;
extern const Metric DatabaseCatalogThreads;
extern const Metric DatabaseCatalogThreadsActive;
}
namespace DB
@ -852,7 +854,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size());
ThreadPool pool;
ThreadPool pool(CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive);
for (const auto & elem : dropped_metadata)
{
pool.scheduleOrThrowOnError([&]()

View File

@ -7,6 +7,7 @@
#include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Common/ShellCommand.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Context.h>
@ -64,6 +65,12 @@
#include "config.h"
namespace CurrentMetrics
{
extern const Metric RestartReplicaThreads;
extern const Metric RestartReplicaThreadsActive;
}
namespace DB
{
@ -685,7 +692,8 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
for (auto & guard : guards)
guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name);
ThreadPool pool(std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size()));
size_t threads = std::min(static_cast<size_t>(getNumberOfPhysicalCPUCores()), replica_names.size());
ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, threads);
for (auto & replica : replica_names)
{

View File

@ -16,16 +16,24 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <Common/escapeForFileName.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <filesystem>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <filesystem>
#define ORDINARY_TO_ATOMIC_PREFIX ".tmp_convert."
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric StartupSystemTablesThreads;
extern const Metric StartupSystemTablesThreadsActive;
}
namespace DB
{
@ -366,7 +374,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
if (!tables_started)
{
/// It's not quite correct to run DDL queries while database is not started up.
ThreadPool pool;
ThreadPool pool(CurrentMetrics::StartupSystemTablesThreads, CurrentMetrics::StartupSystemTablesThreadsActive);
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, LoadingStrictnessLevel::FORCE_RESTORE);
}
@ -461,7 +469,7 @@ void convertDatabasesEnginesIfNeed(ContextMutablePtr context)
void startupSystemTables()
{
ThreadPool pool;
ThreadPool pool(CurrentMetrics::StartupSystemTablesThreads, CurrentMetrics::StartupSystemTablesThreadsActive);
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, LoadingStrictnessLevel::FORCE_RESTORE);
}

View File

@ -7,7 +7,8 @@
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include "IO/WriteBufferFromString.h"
#include <Common/CurrentMetrics.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatFactory.h>
#include <Poco/Event.h>
#include <IO/BufferWithOwnMemory.h>
@ -17,6 +18,12 @@
#include <deque>
#include <atomic>
namespace CurrentMetrics
{
extern const Metric ParallelFormattingOutputFormatThreads;
extern const Metric ParallelFormattingOutputFormatThreadsActive;
}
namespace DB
{
@ -74,7 +81,7 @@ public:
explicit ParallelFormattingOutputFormat(Params params)
: IOutputFormat(params.header, params.out)
, internal_formatter_creator(params.internal_formatter_creator)
, pool(params.max_threads_for_parallel_formatting)
, pool(CurrentMetrics::ParallelFormattingOutputFormatThreads, CurrentMetrics::ParallelFormattingOutputFormatThreadsActive, params.max_threads_for_parallel_formatting)
{
LOG_TEST(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used");

View File

@ -5,14 +5,21 @@
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <Poco/Event.h>
namespace CurrentMetrics
{
extern const Metric ParallelParsingInputFormatThreads;
extern const Metric ParallelParsingInputFormatThreadsActive;
}
namespace DB
{
@ -94,7 +101,7 @@ public:
, min_chunk_bytes(params.min_chunk_bytes)
, max_block_size(params.max_block_size)
, is_server(params.is_server)
, pool(params.max_threads)
, pool(CurrentMetrics::ParallelParsingInputFormatThreads, CurrentMetrics::ParallelParsingInputFormatThreadsActive, params.max_threads)
{
// One unit for each thread, including segmentator and reader, plus a
// couple more units so that the segmentation thread doesn't spuriously

View File

@ -6,6 +6,13 @@
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric DestroyAggregatesThreads;
extern const Metric DestroyAggregatesThreadsActive;
}
namespace DB
{
@ -84,7 +91,10 @@ struct ManyAggregatedData
// Aggregation states destruction may be very time-consuming.
// In the case of a query with LIMIT, most states won't be destroyed during conversion to blocks.
// Without the following code, they would be destroyed in the destructor of AggregatedDataVariants in the current thread (i.e. sequentially).
const auto pool = std::make_unique<ThreadPool>(variants.size());
const auto pool = std::make_unique<ThreadPool>(
CurrentMetrics::DestroyAggregatesThreads,
CurrentMetrics::DestroyAggregatesThreadsActive,
variants.size());
for (auto && variant : variants)
{

View File

@ -7,6 +7,7 @@
#include <fmt/core.h>
#include <Poco/URI.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
@ -40,6 +41,11 @@
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
namespace CurrentMetrics
{
extern const Metric StorageHiveThreads;
extern const Metric StorageHiveThreadsActive;
}
namespace DB
{
@ -844,7 +850,7 @@ HiveFiles StorageHive::collectHiveFiles(
Int64 hive_max_query_partitions = context_->getSettings().max_partitions_to_read;
/// Mutext to protect hive_files, which maybe appended in multiple threads
std::mutex hive_files_mutex;
ThreadPool pool{max_threads};
ThreadPool pool{CurrentMetrics::StorageHiveThreads, CurrentMetrics::StorageHiveThreadsActive, max_threads};
if (!partitions.empty())
{
for (const auto & partition : partitions)

View File

@ -21,6 +21,12 @@
#include <Storages/MergeTree/IExecutableTask.h>
namespace CurrentMetrics
{
extern const Metric MergeTreeBackgroundExecutorThreads;
extern const Metric MergeTreeBackgroundExecutorThreadsActive;
}
namespace DB
{
namespace ErrorCodes
@ -255,6 +261,7 @@ public:
, max_tasks_count(max_tasks_count_)
, metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
, pool(CurrentMetrics::MergeTreeBackgroundExecutorThreads, CurrentMetrics::MergeTreeBackgroundExecutorThreadsActive)
{
if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");

View File

@ -17,6 +17,7 @@
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -117,6 +118,10 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric DelayedInserts;
extern const Metric MergeTreePartsLoaderThreads;
extern const Metric MergeTreePartsLoaderThreadsActive;
extern const Metric MergeTreePartsCleanerThreads;
extern const Metric MergeTreePartsCleanerThreadsActive;
}
@ -1567,7 +1572,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
ThreadPool pool(disks.size());
ThreadPool pool(CurrentMetrics::MergeTreePartsLoaderThreads, CurrentMetrics::MergeTreePartsLoaderThreadsActive, disks.size());
std::vector<PartLoadingTree::PartLoadingInfos> parts_to_load_by_disk(disks.size());
for (size_t i = 0; i < disks.size(); ++i)
@ -2299,7 +2304,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
/// Parallel parts removal.
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size());
std::mutex part_names_mutex;
ThreadPool pool(num_threads);
ThreadPool pool(CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive, num_threads);
bool has_zero_copy_parts = false;
if (settings->allow_remote_fs_zero_copy_replication && dynamic_cast<StorageReplicatedMergeTree *>(this) != nullptr)

View File

@ -35,6 +35,7 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Core/UUID.h>
#include <Common/CurrentMetrics.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
@ -46,6 +47,12 @@
#include <Storages/MergeTree/CommonANNIndexes.h>
namespace CurrentMetrics
{
extern const Metric MergeTreeDataSelectExecutorThreads;
extern const Metric MergeTreeDataSelectExecutorThreadsActive;
}
namespace DB
{
@ -1077,7 +1084,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
else
{
/// Parallel loading of data parts.
ThreadPool pool(num_threads);
ThreadPool pool(
CurrentMetrics::MergeTreeDataSelectExecutorThreads,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive,
num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]

View File

@ -29,6 +29,7 @@
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Common/formatReadable.h>
#include <Common/CurrentMetrics.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -126,6 +127,12 @@ namespace ProfileEvents
extern const Event DistributedDelayedInsertsMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric StorageDistributedThreads;
extern const Metric StorageDistributedThreadsActive;
}
namespace DB
{
@ -1436,7 +1443,7 @@ void StorageDistributed::initializeFromDisk()
const auto & disks = data_volume->getDisks();
/// Make initialization for large number of disks parallel.
ThreadPool pool(disks.size());
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, disks.size());
for (const DiskPtr & disk : disks)
{

View File

@ -29,7 +29,6 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Storages/ReadFromStorageProgress.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
@ -53,8 +52,10 @@
#include <aws/core/auth/AWSCredentials.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/parseGlobs.h>
#include <Common/quoteString.h>
#include <Common/CurrentMetrics.h>
#include <re2/re2.h>
#include <Processors/ISource.h>
@ -65,6 +66,12 @@
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
}
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
@ -150,7 +157,7 @@ public:
, object_infos(object_infos_)
, read_keys(read_keys_)
, request_settings(request_settings_)
, list_objects_pool(1)
, list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
, list_objects_scheduler(threadPoolCallbackRunner<ListObjectsOutcome>(list_objects_pool, "ListObjects"))
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
@ -574,7 +581,7 @@ StorageS3Source::StorageS3Source(
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
, create_reader_pool(1)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
{
reader = createReader();

View File

@ -7,12 +7,19 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Access/ContextAccess.h>
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
namespace CurrentMetrics
{
extern const Metric SystemReplicasThreads;
extern const Metric SystemReplicasThreadsActive;
}
namespace DB
{
@ -160,7 +167,7 @@ Pipe StorageSystemReplicas::read(
if (settings.max_threads != 0)
thread_pool_size = std::min(thread_pool_size, static_cast<size_t>(settings.max_threads));
ThreadPool thread_pool(thread_pool_size);
ThreadPool thread_pool(CurrentMetrics::SystemReplicasThreads, CurrentMetrics::SystemReplicasThreadsActive, thread_pool_size);
for (size_t i = 0; i < tables_size; ++i)
{

View File

@ -8,6 +8,7 @@
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/InterruptListener.h>
#include <Common/CurrentMetrics.h>
#include <Core/Types.h>
#include "Stats.h"
@ -15,6 +16,12 @@
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
class Runner
{
public:
@ -27,7 +34,7 @@ public:
bool continue_on_error_,
size_t max_iterations_)
: concurrency(concurrency_)
, pool(concurrency)
, pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency)
, hosts_strings(hosts_strings_)
, generator(getGenerator(generator_name))
, max_time(max_time_)