diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index 994f9b7ac4d..466a0c194f7 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include @@ -43,6 +44,12 @@ namespace fs = std::filesystem; * The tool emulates a case with fixed amount of simultaneously executing queries. */ +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + namespace DB { @@ -103,7 +110,7 @@ public: settings(settings_), shared_context(Context::createShared()), global_context(Context::createGlobal(shared_context.get())), - pool(concurrency) + pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency) { const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable; size_t connections_cnt = std::max(ports_.size(), hosts_.size()); diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 3a6261974d1..079c70596a6 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,12 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + namespace DB { @@ -192,7 +199,7 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, { /// Fetch partitions list from a shard { - ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); + ThreadPool thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); for (const TaskShardPtr & task_shard : task_table.all_shards) thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index b6cf74efe22..5ac51a81eca 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -20,10 +20,19 @@ #include #include #include +#include #include #include +namespace CurrentMetrics +{ + extern const Metric BackupsThreads; + extern const Metric BackupsThreadsActive; + extern const Metric RestoreThreads; + extern const Metric RestoreThreadsActive; +} + namespace DB { @@ -152,8 +161,8 @@ namespace BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_) - : backups_thread_pool(num_backup_threads, /* max_free_threads = */ 0, num_backup_threads) - , restores_thread_pool(num_restore_threads, /* max_free_threads = */ 0, num_restore_threads) + : backups_thread_pool(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads) + , restores_thread_pool(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads) , log(&Poco::Logger::get("BackupsWorker")) , allow_concurrent_backups(allow_concurrent_backups_) , allow_concurrent_restores(allow_concurrent_restores_) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 5b20d98aa01..4c773048597 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -72,6 +72,64 @@ M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \ M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ M(LocalThreadActive, "Number of threads in local thread pools running a task.") \ + M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \ + M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \ + M(BackupsThreads, "Number of threads in the thread pool for BACKUP.") \ + M(BackupsThreadsActive, "Number of threads in thread pool for BACKUP running a task.") \ + M(RestoreThreads, "Number of threads in the thread pool for RESTORE.") \ + M(RestoreThreadsActive, "Number of threads in the thread pool for RESTORE running a task.") \ + M(IOThreads, "Number of threads in the IO thread pool.") \ + M(IOThreadsActive, "Number of threads in the IO thread pool running a task.") \ + M(ThreadPoolRemoteFSReaderThreads, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool.") \ + M(ThreadPoolRemoteFSReaderThreadsActive, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool running a task.") \ + M(ThreadPoolFSReaderThreads, "Number of threads in the thread pool for local_filesystem_read_method=threadpool.") \ + M(ThreadPoolFSReaderThreadsActive, "Number of threads in the thread pool for local_filesystem_read_method=threadpool running a task.") \ + M(BackupsIOThreads, "Number of threads in the BackupsIO thread pool.") \ + M(BackupsIOThreadsActive, "Number of threads in the BackupsIO thread pool running a task.") \ + M(DiskObjectStorageAsyncThreads, "Number of threads in the async thread pool for DiskObjectStorage.") \ + M(DiskObjectStorageAsyncThreadsActive, "Number of threads in the async thread pool for DiskObjectStorage running a task.") \ + M(StorageHiveThreads, "Number of threads in the StorageHive thread pool.") \ + M(StorageHiveThreadsActive, "Number of threads in the StorageHive thread pool running a task.") \ + M(TablesLoaderThreads, "Number of threads in the tables loader thread pool.") \ + M(TablesLoaderThreadsActive, "Number of threads in the tables loader thread pool running a task.") \ + M(DatabaseOrdinaryThreads, "Number of threads in the Ordinary database thread pool.") \ + M(DatabaseOrdinaryThreadsActive, "Number of threads in the Ordinary database thread pool running a task.") \ + M(DatabaseOnDiskThreads, "Number of threads in the DatabaseOnDisk thread pool.") \ + M(DatabaseOnDiskThreadsActive, "Number of threads in the DatabaseOnDisk thread pool running a task.") \ + M(DatabaseCatalogThreads, "Number of threads in the DatabaseCatalog thread pool.") \ + M(DatabaseCatalogThreadsActive, "Number of threads in the DatabaseCatalog thread pool running a task.") \ + M(DestroyAggregatesThreads, "Number of threads in the thread pool for destroy aggregate states.") \ + M(DestroyAggregatesThreadsActive, "Number of threads in the thread pool for destroy aggregate states running a task.") \ + M(HashedDictionaryThreads, "Number of threads in the HashedDictionary thread pool.") \ + M(HashedDictionaryThreadsActive, "Number of threads in the HashedDictionary thread pool running a task.") \ + M(CacheDictionaryThreads, "Number of threads in the CacheDictionary thread pool.") \ + M(CacheDictionaryThreadsActive, "Number of threads in the CacheDictionary thread pool running a task.") \ + M(ParallelFormattingOutputFormatThreads, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool.") \ + M(ParallelFormattingOutputFormatThreadsActive, "Number of threads in the ParallelFormattingOutputFormatThreads thread pool running a task.") \ + M(ParallelParsingInputFormatThreads, "Number of threads in the ParallelParsingInputFormat thread pool.") \ + M(ParallelParsingInputFormatThreadsActive, "Number of threads in the ParallelParsingInputFormat thread pool running a task.") \ + M(MergeTreeBackgroundExecutorThreads, "Number of threads in the MergeTreeBackgroundExecutor thread pool.") \ + M(MergeTreeBackgroundExecutorThreadsActive, "Number of threads in the MergeTreeBackgroundExecutor thread pool running a task.") \ + M(AsynchronousInsertThreads, "Number of threads in the AsynchronousInsert thread pool.") \ + M(AsynchronousInsertThreadsActive, "Number of threads in the AsynchronousInsert thread pool running a task.") \ + M(StartupSystemTablesThreads, "Number of threads in the StartupSystemTables thread pool.") \ + M(StartupSystemTablesThreadsActive, "Number of threads in the StartupSystemTables thread pool running a task.") \ + M(AggregatorThreads, "Number of threads in the Aggregator thread pool.") \ + M(AggregatorThreadsActive, "Number of threads in the Aggregator thread pool running a task.") \ + M(DDLWorkerThreads, "Number of threads in the DDLWorker thread pool for ON CLUSTER queries.") \ + M(DDLWorkerThreadsActive, "Number of threads in the DDLWORKER thread pool for ON CLUSTER queries running a task.") \ + M(StorageDistributedThreads, "Number of threads in the StorageDistributed thread pool.") \ + M(StorageDistributedThreadsActive, "Number of threads in the StorageDistributed thread pool running a task.") \ + M(StorageS3Threads, "Number of threads in the StorageS3 thread pool.") \ + M(StorageS3ThreadsActive, "Number of threads in the StorageS3 thread pool running a task.") \ + M(MergeTreePartsLoaderThreads, "Number of threads in the MergeTree parts loader thread pool.") \ + M(MergeTreePartsLoaderThreadsActive, "Number of threads in the MergeTree parts loader thread pool running a task.") \ + M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \ + M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \ + M(SystemReplicasThreads, "Number of threads in the system.replicas thread pool.") \ + M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \ + M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \ + M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \ M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \ M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \ M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \ diff --git a/src/Common/CurrentMetrics.h b/src/Common/CurrentMetrics.h index c184ee1e7f2..0ae16e2d08d 100644 --- a/src/Common/CurrentMetrics.h +++ b/src/Common/CurrentMetrics.h @@ -4,6 +4,7 @@ #include #include #include +#include #include /** Allows to count number of simultaneously happening processes or current value of some metric. @@ -73,7 +74,10 @@ namespace CurrentMetrics public: explicit Increment(Metric metric, Value amount_ = 1) - : Increment(&values[metric], amount_) {} + : Increment(&values[metric], amount_) + { + assert(metric < CurrentMetrics::end()); + } ~Increment() { diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index caa32b61c65..11b115c9234 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -25,27 +25,36 @@ namespace CurrentMetrics { extern const Metric GlobalThread; extern const Metric GlobalThreadActive; - extern const Metric LocalThread; - extern const Metric LocalThreadActive; } template -ThreadPoolImpl::ThreadPoolImpl() - : ThreadPoolImpl(getNumberOfPhysicalCPUCores()) +ThreadPoolImpl::ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_) + : ThreadPoolImpl(metric_threads_, metric_active_threads_, getNumberOfPhysicalCPUCores()) { } template -ThreadPoolImpl::ThreadPoolImpl(size_t max_threads_) - : ThreadPoolImpl(max_threads_, max_threads_, max_threads_) +ThreadPoolImpl::ThreadPoolImpl( + Metric metric_threads_, + Metric metric_active_threads_, + size_t max_threads_) + : ThreadPoolImpl(metric_threads_, metric_active_threads_, max_threads_, max_threads_, max_threads_) { } template -ThreadPoolImpl::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_) - : max_threads(max_threads_) +ThreadPoolImpl::ThreadPoolImpl( + Metric metric_threads_, + Metric metric_active_threads_, + size_t max_threads_, + size_t max_free_threads_, + size_t queue_size_, + bool shutdown_on_exception_) + : metric_threads(metric_threads_) + , metric_active_threads(metric_active_threads_) + , max_threads(max_threads_) , max_free_threads(std::min(max_free_threads_, max_threads)) , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) @@ -322,8 +331,7 @@ template void ThreadPoolImpl::worker(typename std::list::iterator thread_it) { DENY_ALLOCATIONS_IN_SCOPE; - CurrentMetrics::Increment metric_all_threads( - std::is_same_v ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread); + CurrentMetrics::Increment metric_pool_threads(metric_threads); /// Remove this thread from `threads` and detach it, that must be done before exiting from this worker. /// We can't wrap the following lambda function into `SCOPE_EXIT` because it requires `mutex` to be locked. @@ -381,8 +389,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ try { - CurrentMetrics::Increment metric_active_threads( - std::is_same_v ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive); + CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads); job(); @@ -449,6 +456,22 @@ template class ThreadFromGlobalPoolImpl; std::unique_ptr GlobalThreadPool::the_instance; + +GlobalThreadPool::GlobalThreadPool( + size_t max_threads_, + size_t max_free_threads_, + size_t queue_size_, + const bool shutdown_on_exception_) + : FreeThreadPool( + CurrentMetrics::GlobalThread, + CurrentMetrics::GlobalThreadActive, + max_threads_, + max_free_threads_, + queue_size_, + shutdown_on_exception_) +{ +} + void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size) { if (the_instance) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index a1ca79a1e4b..b2f77f9693c 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -16,6 +16,7 @@ #include #include #include +#include #include /** Very simple thread pool similar to boost::threadpool. @@ -33,15 +34,25 @@ class ThreadPoolImpl { public: using Job = std::function; + using Metric = CurrentMetrics::Metric; /// Maximum number of threads is based on the number of physical cores. - ThreadPoolImpl(); + ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_); /// Size is constant. Up to num_threads are created on demand and then run until shutdown. - explicit ThreadPoolImpl(size_t max_threads_); + explicit ThreadPoolImpl( + Metric metric_threads_, + Metric metric_active_threads_, + size_t max_threads_); /// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited. - ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_ = true); + ThreadPoolImpl( + Metric metric_threads_, + Metric metric_active_threads_, + size_t max_threads_, + size_t max_free_threads_, + size_t queue_size_, + bool shutdown_on_exception_ = true); /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown. /// If any thread was throw an exception, first exception will be rethrown from this method, @@ -96,6 +107,9 @@ private: std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; + Metric metric_threads; + Metric metric_active_threads; + size_t max_threads; size_t max_free_threads; size_t queue_size; @@ -159,12 +173,11 @@ class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable { static std::unique_ptr the_instance; - GlobalThreadPool(size_t max_threads_, size_t max_free_threads_, - size_t queue_size_, const bool shutdown_on_exception_) - : FreeThreadPool(max_threads_, max_free_threads_, queue_size_, - shutdown_on_exception_) - { - } + GlobalThreadPool( + size_t max_threads_, + size_t max_free_threads_, + size_t queue_size_, + bool shutdown_on_exception_); public: static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000); diff --git a/src/Common/examples/parallel_aggregation.cpp b/src/Common/examples/parallel_aggregation.cpp index bd252b330f3..cf7a3197fef 100644 --- a/src/Common/examples/parallel_aggregation.cpp +++ b/src/Common/examples/parallel_aggregation.cpp @@ -17,6 +17,7 @@ #include #include +#include using Key = UInt64; @@ -28,6 +29,12 @@ using Map = HashMap; using MapTwoLevel = TwoLevelHashMap; +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + struct SmallLock { std::atomic locked {false}; @@ -247,7 +254,7 @@ int main(int argc, char ** argv) std::cerr << std::fixed << std::setprecision(2); - ThreadPool pool(num_threads); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads); Source data(n); diff --git a/src/Common/examples/parallel_aggregation2.cpp b/src/Common/examples/parallel_aggregation2.cpp index 6c20f46ab0e..1b0ad760490 100644 --- a/src/Common/examples/parallel_aggregation2.cpp +++ b/src/Common/examples/parallel_aggregation2.cpp @@ -17,6 +17,7 @@ #include #include +#include using Key = UInt64; @@ -24,6 +25,12 @@ using Value = UInt64; using Source = std::vector; +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + template struct AggregateIndependent { @@ -274,7 +281,7 @@ int main(int argc, char ** argv) std::cerr << std::fixed << std::setprecision(2); - ThreadPool pool(num_threads); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads); Source data(n); diff --git a/src/Common/examples/thread_creation_latency.cpp b/src/Common/examples/thread_creation_latency.cpp index 351f709013a..2434759c968 100644 --- a/src/Common/examples/thread_creation_latency.cpp +++ b/src/Common/examples/thread_creation_latency.cpp @@ -6,6 +6,7 @@ #include #include #include +#include int value = 0; @@ -14,6 +15,12 @@ static void f() { ++value; } static void * g(void *) { f(); return {}; } +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + namespace DB { namespace ErrorCodes @@ -65,7 +72,7 @@ int main(int argc, char ** argv) test(n, "Create and destroy ThreadPool each iteration", [] { - ThreadPool tp(1); + ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1); tp.scheduleOrThrowOnError(f); tp.wait(); }); @@ -86,7 +93,7 @@ int main(int argc, char ** argv) }); { - ThreadPool tp(1); + ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1); test(n, "Schedule job for Threadpool each iteration", [&tp] { @@ -96,7 +103,7 @@ int main(int argc, char ** argv) } { - ThreadPool tp(128); + ThreadPool tp(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 128); test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp] { diff --git a/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp b/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp index f5f14739e39..f93017129dd 100644 --- a/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp +++ b/src/Common/tests/gtest_thread_pool_concurrent_wait.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -7,6 +8,12 @@ */ +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + TEST(ThreadPool, ConcurrentWait) { auto worker = [] @@ -18,14 +25,14 @@ TEST(ThreadPool, ConcurrentWait) constexpr size_t num_threads = 4; constexpr size_t num_jobs = 4; - ThreadPool pool(num_threads); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_threads); for (size_t i = 0; i < num_jobs; ++i) pool.scheduleOrThrowOnError(worker); constexpr size_t num_waiting_threads = 4; - ThreadPool waiting_pool(num_waiting_threads); + ThreadPool waiting_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_waiting_threads); for (size_t i = 0; i < num_waiting_threads; ++i) waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); }); diff --git a/src/Common/tests/gtest_thread_pool_global_full.cpp b/src/Common/tests/gtest_thread_pool_global_full.cpp index 583d43be1bb..1b2ded9c7e1 100644 --- a/src/Common/tests/gtest_thread_pool_global_full.cpp +++ b/src/Common/tests/gtest_thread_pool_global_full.cpp @@ -2,10 +2,17 @@ #include #include +#include #include +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + /// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool. /// There was a bug: if local ThreadPool cannot allocate even a single thread, /// the job will be scheduled but never get executed. @@ -27,7 +34,7 @@ TEST(ThreadPool, GlobalFull1) auto func = [&] { ++counter; while (counter != num_jobs) {} }; - ThreadPool pool(num_jobs); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, num_jobs); for (size_t i = 0; i < capacity; ++i) pool.scheduleOrThrowOnError(func); @@ -65,11 +72,11 @@ TEST(ThreadPool, GlobalFull2) std::atomic counter = 0; auto func = [&] { ++counter; while (counter != capacity + 1) {} }; - ThreadPool pool(capacity, 0, capacity); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, capacity, 0, capacity); for (size_t i = 0; i < capacity; ++i) pool.scheduleOrThrowOnError(func); - ThreadPool another_pool(1); + ThreadPool another_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 1); EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception); ++counter; diff --git a/src/Common/tests/gtest_thread_pool_loop.cpp b/src/Common/tests/gtest_thread_pool_loop.cpp index 15915044652..556c39df949 100644 --- a/src/Common/tests/gtest_thread_pool_loop.cpp +++ b/src/Common/tests/gtest_thread_pool_loop.cpp @@ -1,10 +1,17 @@ #include #include #include +#include #include +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + TEST(ThreadPool, Loop) { std::atomic res{0}; @@ -12,7 +19,7 @@ TEST(ThreadPool, Loop) for (size_t i = 0; i < 1000; ++i) { size_t threads = 16; - ThreadPool pool(threads); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, threads); for (size_t j = 0; j < threads; ++j) pool.scheduleOrThrowOnError([&] { ++res; }); pool.wait(); diff --git a/src/Common/tests/gtest_thread_pool_schedule_exception.cpp b/src/Common/tests/gtest_thread_pool_schedule_exception.cpp index 69362c34cd2..176e469d5ef 100644 --- a/src/Common/tests/gtest_thread_pool_schedule_exception.cpp +++ b/src/Common/tests/gtest_thread_pool_schedule_exception.cpp @@ -1,13 +1,20 @@ #include #include #include +#include #include +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + static bool check() { - ThreadPool pool(10); + ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 10); /// The throwing thread. pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); }); diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index 4d9e22bd15d..01afbdcaa57 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -17,14 +17,21 @@ #include #include #include +#include +#include +#include #include #include -#include #include -#include namespace fs = std::filesystem; +namespace CurrentMetrics +{ + extern const Metric DatabaseOnDiskThreads; + extern const Metric DatabaseOnDiskThreadsActive; +} + namespace DB { @@ -620,7 +627,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat } /// Read and parse metadata in parallel - ThreadPool pool; + ThreadPool pool(CurrentMetrics::DatabaseOnDiskThreads, CurrentMetrics::DatabaseOnDiskThreadsActive); for (const auto & file : metadata_files) { pool.scheduleOrThrowOnError([&]() diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 49250602132..0db16f80656 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -25,9 +25,16 @@ #include #include #include +#include namespace fs = std::filesystem; +namespace CurrentMetrics +{ + extern const Metric DatabaseOrdinaryThreads; + extern const Metric DatabaseOrdinaryThreadsActive; +} + namespace DB { @@ -99,7 +106,7 @@ void DatabaseOrdinary::loadStoredObjects( std::atomic dictionaries_processed{0}; std::atomic tables_processed{0}; - ThreadPool pool; + ThreadPool pool(CurrentMetrics::DatabaseOrdinaryThreads, CurrentMetrics::DatabaseOrdinaryThreadsActive); /// We must attach dictionaries before attaching tables /// because while we're attaching tables we may need to have some dictionaries attached diff --git a/src/Databases/TablesLoader.cpp b/src/Databases/TablesLoader.cpp index 5d66f49554d..177b32daa2e 100644 --- a/src/Databases/TablesLoader.cpp +++ b/src/Databases/TablesLoader.cpp @@ -8,8 +8,15 @@ #include #include #include +#include #include +namespace CurrentMetrics +{ + extern const Metric TablesLoaderThreads; + extern const Metric TablesLoaderThreadsActive; +} + namespace DB { @@ -31,12 +38,13 @@ void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, Atomic } TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, LoadingStrictnessLevel strictness_mode_) -: global_context(global_context_) -, databases(std::move(databases_)) -, strictness_mode(strictness_mode_) -, referential_dependencies("ReferentialDeps") -, loading_dependencies("LoadingDeps") -, all_loading_dependencies("LoadingDeps") + : global_context(global_context_) + , databases(std::move(databases_)) + , strictness_mode(strictness_mode_) + , referential_dependencies("ReferentialDeps") + , loading_dependencies("LoadingDeps") + , all_loading_dependencies("LoadingDeps") + , pool(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive) { metadata.default_database = global_context->getCurrentDatabase(); log = &Poco::Logger::get("TablesLoader"); diff --git a/src/Dictionaries/CacheDictionaryUpdateQueue.cpp b/src/Dictionaries/CacheDictionaryUpdateQueue.cpp index 1fdaf10c57c..09d5bed18b8 100644 --- a/src/Dictionaries/CacheDictionaryUpdateQueue.cpp +++ b/src/Dictionaries/CacheDictionaryUpdateQueue.cpp @@ -2,8 +2,15 @@ #include +#include #include +namespace CurrentMetrics +{ + extern const Metric CacheDictionaryThreads; + extern const Metric CacheDictionaryThreadsActive; +} + namespace DB { @@ -26,7 +33,7 @@ CacheDictionaryUpdateQueue::CacheDictionaryUpdateQueue( , configuration(configuration_) , update_func(std::move(update_func_)) , update_queue(configuration.max_update_queue_size) - , update_pool(configuration.max_threads_for_updates) + , update_pool(CurrentMetrics::CacheDictionaryThreads, CurrentMetrics::CacheDictionaryThreadsActive, configuration.max_threads_for_updates) { for (size_t i = 0; i < configuration.max_threads_for_updates; ++i) update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); }); diff --git a/src/Dictionaries/HashedDictionary.cpp b/src/Dictionaries/HashedDictionary.cpp index d6c9ac50dbe..0e5d18363e9 100644 --- a/src/Dictionaries/HashedDictionary.cpp +++ b/src/Dictionaries/HashedDictionary.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -21,6 +22,11 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric HashedDictionaryThreads; + extern const Metric HashedDictionaryThreadsActive; +} namespace { @@ -60,7 +66,7 @@ public: explicit ParallelDictionaryLoader(HashedDictionary & dictionary_) : dictionary(dictionary_) , shards(dictionary.configuration.shards) - , pool(shards) + , pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, shards) , shards_queues(shards) { UInt64 backlog = dictionary.configuration.shard_load_queue_backlog; @@ -213,7 +219,7 @@ HashedDictionary::~HashedDictionary() return; size_t shards = std::max(configuration.shards, 1); - ThreadPool pool(shards); + ThreadPool pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, shards); size_t hash_tables_count = 0; auto schedule_destroy = [&hash_tables_count, &pool](auto & container) diff --git a/src/Disks/IO/ThreadPoolReader.cpp b/src/Disks/IO/ThreadPoolReader.cpp index 18b283b0ff3..3a071d13122 100644 --- a/src/Disks/IO/ThreadPoolReader.cpp +++ b/src/Disks/IO/ThreadPoolReader.cpp @@ -61,6 +61,8 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric Read; + extern const Metric ThreadPoolFSReaderThreads; + extern const Metric ThreadPoolFSReaderThreadsActive; } @@ -85,7 +87,7 @@ static bool hasBugInPreadV2() #endif ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_) - : pool(pool_size, pool_size, queue_size_) + : pool(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_) { } diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index c2d3ee8b53d..1980f57c876 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -26,6 +26,8 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric RemoteRead; + extern const Metric ThreadPoolRemoteFSReaderThreads; + extern const Metric ThreadPoolRemoteFSReaderThreadsActive; } namespace DB @@ -60,7 +62,7 @@ IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_) - : pool(pool_size, pool_size, queue_size_) + : pool(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_) { } diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 44cb80558af..6143f0620b8 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -17,6 +18,13 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric DiskObjectStorageAsyncThreads; + extern const Metric DiskObjectStorageAsyncThreadsActive; +} + + namespace DB { @@ -38,7 +46,8 @@ class AsyncThreadPoolExecutor : public Executor public: AsyncThreadPoolExecutor(const String & name_, int thread_pool_size) : name(name_) - , pool(ThreadPool(thread_pool_size)) {} + , pool(CurrentMetrics::DiskObjectStorageAsyncThreads, CurrentMetrics::DiskObjectStorageAsyncThreadsActive, thread_pool_size) + {} std::future execute(std::function task) override { diff --git a/src/IO/BackupIOThreadPool.cpp b/src/IO/BackupsIOThreadPool.cpp similarity index 59% rename from src/IO/BackupIOThreadPool.cpp rename to src/IO/BackupsIOThreadPool.cpp index 067fc54b1ae..0829553945a 100644 --- a/src/IO/BackupIOThreadPool.cpp +++ b/src/IO/BackupsIOThreadPool.cpp @@ -1,5 +1,12 @@ #include -#include "Core/Field.h" +#include +#include + +namespace CurrentMetrics +{ + extern const Metric BackupsIOThreads; + extern const Metric BackupsIOThreadsActive; +} namespace DB { @@ -18,7 +25,13 @@ void BackupsIOThreadPool::initialize(size_t max_threads, size_t max_free_threads throw Exception(ErrorCodes::LOGICAL_ERROR, "The BackupsIO thread pool is initialized twice"); } - instance = std::make_unique(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/); + instance = std::make_unique( + CurrentMetrics::BackupsIOThreads, + CurrentMetrics::BackupsIOThreadsActive, + max_threads, + max_free_threads, + queue_size, + /* shutdown_on_exception= */ false); } ThreadPool & BackupsIOThreadPool::get() diff --git a/src/IO/IOThreadPool.cpp b/src/IO/IOThreadPool.cpp index 4014d00d8b8..98bb6ffe6a7 100644 --- a/src/IO/IOThreadPool.cpp +++ b/src/IO/IOThreadPool.cpp @@ -1,5 +1,12 @@ #include -#include "Core/Field.h" +#include +#include + +namespace CurrentMetrics +{ + extern const Metric IOThreads; + extern const Metric IOThreadsActive; +} namespace DB { @@ -18,7 +25,13 @@ void IOThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_ throw Exception(ErrorCodes::LOGICAL_ERROR, "The IO thread pool is initialized twice"); } - instance = std::make_unique(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/); + instance = std::make_unique( + CurrentMetrics::IOThreads, + CurrentMetrics::IOThreadsActive, + max_threads, + max_free_threads, + queue_size, + /* shutdown_on_exception= */ false); } ThreadPool & IOThreadPool::get() diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index b0bcea23449..d6fbf072d05 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,8 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric TemporaryFilesForAggregation; + extern const Metric AggregatorThreads; + extern const Metric AggregatorThreadsActive; } namespace DB @@ -2397,7 +2400,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b std::unique_ptr thread_pool; if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Make a custom threshold. && data_variants.isTwoLevel()) /// TODO Use the shared thread pool with the `merge` function. - thread_pool = std::make_unique(max_threads); + thread_pool = std::make_unique(CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, max_threads); if (data_variants.without_key) blocks.emplace_back(prepareBlockAndFillWithoutKey( @@ -2592,7 +2595,7 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl( void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( ManyAggregatedDataVariants & non_empty_data) const { - ThreadPool thread_pool{params.max_threads}; + ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, params.max_threads}; AggregatedDataVariantsPtr & res = non_empty_data[0]; @@ -3065,7 +3068,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari std::unique_ptr thread_pool; if (max_threads > 1 && total_input_rows > 100000) /// TODO Make a custom threshold. - thread_pool = std::make_unique(max_threads); + thread_pool = std::make_unique(CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, max_threads); for (const auto & bucket_blocks : bucket_to_blocks) { diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 590cbc9ba83..2dd2409442e 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -32,6 +32,8 @@ namespace CurrentMetrics { extern const Metric PendingAsyncInsert; + extern const Metric AsynchronousInsertThreads; + extern const Metric AsynchronousInsertThreadsActive; } namespace ProfileEvents @@ -130,7 +132,7 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo : WithContext(context_) , pool_size(pool_size_) , queue_shards(pool_size) - , pool(pool_size) + , pool(CurrentMetrics::AsynchronousInsertThreads, CurrentMetrics::AsynchronousInsertThreadsActive, pool_size) { if (!pool_size) throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero"); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index f70c0010585..22bece0ef04 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -40,6 +40,12 @@ namespace fs = std::filesystem; +namespace CurrentMetrics +{ + extern const Metric DDLWorkerThreads; + extern const Metric DDLWorkerThreadsActive; +} + namespace DB { @@ -85,7 +91,7 @@ DDLWorker::DDLWorker( { LOG_WARNING(log, "DDLWorker is configured to use multiple threads. " "It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear."); - worker_pool = std::make_unique(pool_size); + worker_pool = std::make_unique(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size); } queue_dir = zk_root_dir; @@ -1084,7 +1090,7 @@ void DDLWorker::runMainThread() /// It will wait for all threads in pool to finish and will not rethrow exceptions (if any). /// We create new thread pool to forget previous exceptions. if (1 < pool_size) - worker_pool = std::make_unique(pool_size); + worker_pool = std::make_unique(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size); /// Clear other in-memory state, like server just started. current_tasks.clear(); last_skipped_entry_name.reset(); @@ -1123,7 +1129,7 @@ void DDLWorker::runMainThread() initialized = false; /// Wait for pending async tasks if (1 < pool_size) - worker_pool = std::make_unique(pool_size); + worker_pool = std::make_unique(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, pool_size); LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true)); } else diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index 65ef4b440a1..6cf034edae8 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index b11a973c7b7..6af09972f1c 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -36,6 +36,8 @@ namespace CurrentMetrics { extern const Metric TablesToDropQueueSize; + extern const Metric DatabaseCatalogThreads; + extern const Metric DatabaseCatalogThreadsActive; } namespace DB @@ -850,7 +852,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size()); - ThreadPool pool; + ThreadPool pool(CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive); for (const auto & elem : dropped_metadata) { pool.scheduleOrThrowOnError([&]() diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index fb6b1635f28..cd3e5579e12 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -64,6 +65,12 @@ #include "config.h" +namespace CurrentMetrics +{ + extern const Metric RestartReplicaThreads; + extern const Metric RestartReplicaThreadsActive; +} + namespace DB { @@ -685,7 +692,8 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context) for (auto & guard : guards) guard.second = catalog.getDDLGuard(guard.first.database_name, guard.first.table_name); - ThreadPool pool(std::min(static_cast(getNumberOfPhysicalCPUCores()), replica_names.size())); + size_t threads = std::min(static_cast(getNumberOfPhysicalCPUCores()), replica_names.size()); + ThreadPool pool(CurrentMetrics::RestartReplicaThreads, CurrentMetrics::RestartReplicaThreadsActive, threads); for (auto & replica : replica_names) { diff --git a/src/Interpreters/loadMetadata.cpp b/src/Interpreters/loadMetadata.cpp index 47ccff57419..83af2684322 100644 --- a/src/Interpreters/loadMetadata.cpp +++ b/src/Interpreters/loadMetadata.cpp @@ -16,16 +16,24 @@ #include #include -#include +#include #include -#include #include +#include + +#include #define ORDINARY_TO_ATOMIC_PREFIX ".tmp_convert." namespace fs = std::filesystem; +namespace CurrentMetrics +{ + extern const Metric StartupSystemTablesThreads; + extern const Metric StartupSystemTablesThreadsActive; +} + namespace DB { @@ -366,7 +374,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons if (!tables_started) { /// It's not quite correct to run DDL queries while database is not started up. - ThreadPool pool; + ThreadPool pool(CurrentMetrics::StartupSystemTablesThreads, CurrentMetrics::StartupSystemTablesThreadsActive); DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, LoadingStrictnessLevel::FORCE_RESTORE); } @@ -461,7 +469,7 @@ void convertDatabasesEnginesIfNeed(ContextMutablePtr context) void startupSystemTables() { - ThreadPool pool; + ThreadPool pool(CurrentMetrics::StartupSystemTablesThreads, CurrentMetrics::StartupSystemTablesThreadsActive); DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, LoadingStrictnessLevel::FORCE_RESTORE); } diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 7ea19f01e01..790d05e83dd 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -7,7 +7,8 @@ #include #include #include -#include "IO/WriteBufferFromString.h" +#include +#include #include #include #include @@ -17,6 +18,12 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric ParallelFormattingOutputFormatThreads; + extern const Metric ParallelFormattingOutputFormatThreadsActive; +} + namespace DB { @@ -74,7 +81,7 @@ public: explicit ParallelFormattingOutputFormat(Params params) : IOutputFormat(params.header, params.out) , internal_formatter_creator(params.internal_formatter_creator) - , pool(params.max_threads_for_parallel_formatting) + , pool(CurrentMetrics::ParallelFormattingOutputFormatThreads, CurrentMetrics::ParallelFormattingOutputFormatThreadsActive, params.max_threads_for_parallel_formatting) { LOG_TEST(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used"); diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 03fb2d650dc..97df9308dbf 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -5,14 +5,21 @@ #include #include #include +#include +#include #include #include #include #include -#include #include +namespace CurrentMetrics +{ + extern const Metric ParallelParsingInputFormatThreads; + extern const Metric ParallelParsingInputFormatThreadsActive; +} + namespace DB { @@ -94,7 +101,7 @@ public: , min_chunk_bytes(params.min_chunk_bytes) , max_block_size(params.max_block_size) , is_server(params.is_server) - , pool(params.max_threads) + , pool(CurrentMetrics::ParallelParsingInputFormatThreads, CurrentMetrics::ParallelParsingInputFormatThreadsActive, params.max_threads) { // One unit for each thread, including segmentator and reader, plus a // couple more units so that the segmentation thread doesn't spuriously diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index 3abd2ac3346..048b69adae6 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -6,6 +6,13 @@ #include #include #include +#include + +namespace CurrentMetrics +{ + extern const Metric DestroyAggregatesThreads; + extern const Metric DestroyAggregatesThreadsActive; +} namespace DB { @@ -84,7 +91,10 @@ struct ManyAggregatedData // Aggregation states destruction may be very time-consuming. // In the case of a query with LIMIT, most states won't be destroyed during conversion to blocks. // Without the following code, they would be destroyed in the destructor of AggregatedDataVariants in the current thread (i.e. sequentially). - const auto pool = std::make_unique(variants.size()); + const auto pool = std::make_unique( + CurrentMetrics::DestroyAggregatesThreads, + CurrentMetrics::DestroyAggregatesThreadsActive, + variants.size()); for (auto && variant : variants) { diff --git a/src/Storages/Hive/StorageHive.cpp b/src/Storages/Hive/StorageHive.cpp index 85e6341eb5a..71830e8bc86 100644 --- a/src/Storages/Hive/StorageHive.cpp +++ b/src/Storages/Hive/StorageHive.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,11 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric StorageHiveThreads; + extern const Metric StorageHiveThreadsActive; +} namespace DB { @@ -844,7 +850,7 @@ HiveFiles StorageHive::collectHiveFiles( Int64 hive_max_query_partitions = context_->getSettings().max_partitions_to_read; /// Mutext to protect hive_files, which maybe appended in multiple threads std::mutex hive_files_mutex; - ThreadPool pool{max_threads}; + ThreadPool pool{CurrentMetrics::StorageHiveThreads, CurrentMetrics::StorageHiveThreadsActive, max_threads}; if (!partitions.empty()) { for (const auto & partition : partitions) diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index 5c47d20865b..a27fb18c0fe 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -21,6 +21,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric MergeTreeBackgroundExecutorThreads; + extern const Metric MergeTreeBackgroundExecutorThreadsActive; +} + namespace DB { namespace ErrorCodes @@ -255,6 +261,7 @@ public: , max_tasks_count(max_tasks_count_) , metric(metric_) , max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending + , pool(CurrentMetrics::MergeTreeBackgroundExecutorThreads, CurrentMetrics::MergeTreeBackgroundExecutorThreadsActive) { if (max_tasks_count == 0) throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero"); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index fef6ccbdaeb..f9848b572f9 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -117,6 +118,10 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric DelayedInserts; + extern const Metric MergeTreePartsLoaderThreads; + extern const Metric MergeTreePartsLoaderThreadsActive; + extern const Metric MergeTreePartsCleanerThreads; + extern const Metric MergeTreePartsCleanerThreadsActive; } @@ -1567,7 +1572,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } } - ThreadPool pool(disks.size()); + ThreadPool pool(CurrentMetrics::MergeTreePartsLoaderThreads, CurrentMetrics::MergeTreePartsLoaderThreadsActive, disks.size()); std::vector parts_to_load_by_disk(disks.size()); for (size_t i = 0; i < disks.size(); ++i) @@ -2299,7 +2304,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t /// Parallel parts removal. size_t num_threads = std::min(settings->max_part_removal_threads, parts_to_remove.size()); std::mutex part_names_mutex; - ThreadPool pool(num_threads); + ThreadPool pool(CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive, num_threads); bool has_zero_copy_parts = false; if (settings->allow_remote_fs_zero_copy_replication && dynamic_cast(this) != nullptr) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index ff8862f0f36..2039912106c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -35,6 +35,7 @@ #include #include +#include #include #include #include @@ -46,6 +47,12 @@ #include +namespace CurrentMetrics +{ + extern const Metric MergeTreeDataSelectExecutorThreads; + extern const Metric MergeTreeDataSelectExecutorThreadsActive; +} + namespace DB { @@ -1077,7 +1084,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd else { /// Parallel loading of data parts. - ThreadPool pool(num_threads); + ThreadPool pool( + CurrentMetrics::MergeTreeDataSelectExecutorThreads, + CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive, + num_threads); for (size_t part_index = 0; part_index < parts.size(); ++part_index) pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 0be4ae3a79f..4dddd22fae5 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -126,6 +127,12 @@ namespace ProfileEvents extern const Event DistributedDelayedInsertsMilliseconds; } +namespace CurrentMetrics +{ + extern const Metric StorageDistributedThreads; + extern const Metric StorageDistributedThreadsActive; +} + namespace DB { @@ -1436,7 +1443,7 @@ void StorageDistributed::initializeFromDisk() const auto & disks = data_volume->getDisks(); /// Make initialization for large number of disks parallel. - ThreadPool pool(disks.size()); + ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, disks.size()); for (const DiskPtr & disk : disks) { diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index e17860af288..c7d2915f32f 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include @@ -53,8 +52,10 @@ #include +#include #include #include +#include #include #include @@ -65,6 +66,12 @@ namespace fs = std::filesystem; +namespace CurrentMetrics +{ + extern const Metric StorageS3Threads; + extern const Metric StorageS3ThreadsActive; +} + namespace ProfileEvents { extern const Event S3DeleteObjects; @@ -150,7 +157,7 @@ public: , object_infos(object_infos_) , read_keys(read_keys_) , request_settings(request_settings_) - , list_objects_pool(1) + , list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1) , list_objects_scheduler(threadPoolCallbackRunner(list_objects_pool, "ListObjects")) { if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) @@ -574,7 +581,7 @@ StorageS3Source::StorageS3Source( , requested_virtual_columns(requested_virtual_columns_) , file_iterator(file_iterator_) , download_thread_num(download_thread_num_) - , create_reader_pool(1) + , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1) , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "CreateS3Reader")) { reader = createReader(); diff --git a/src/Storages/System/StorageSystemReplicas.cpp b/src/Storages/System/StorageSystemReplicas.cpp index 65878d356f4..240d452fe29 100644 --- a/src/Storages/System/StorageSystemReplicas.cpp +++ b/src/Storages/System/StorageSystemReplicas.cpp @@ -7,12 +7,19 @@ #include #include #include -#include #include #include +#include +#include #include +namespace CurrentMetrics +{ + extern const Metric SystemReplicasThreads; + extern const Metric SystemReplicasThreadsActive; +} + namespace DB { @@ -160,7 +167,7 @@ Pipe StorageSystemReplicas::read( if (settings.max_threads != 0) thread_pool_size = std::min(thread_pool_size, static_cast(settings.max_threads)); - ThreadPool thread_pool(thread_pool_size); + ThreadPool thread_pool(CurrentMetrics::SystemReplicasThreads, CurrentMetrics::SystemReplicasThreadsActive, thread_pool_size); for (size_t i = 0; i < tables_size; ++i) { diff --git a/utils/keeper-bench/Runner.h b/utils/keeper-bench/Runner.h index 3976ac720eb..a00b7b43eff 100644 --- a/utils/keeper-bench/Runner.h +++ b/utils/keeper-bench/Runner.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "Stats.h" @@ -15,6 +16,12 @@ using Ports = std::vector; using Strings = std::vector; +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + class Runner { public: @@ -27,7 +34,7 @@ public: bool continue_on_error_, size_t max_iterations_) : concurrency(concurrency_) - , pool(concurrency) + , pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency) , hosts_strings(hosts_strings_) , generator(getGenerator(generator_name)) , max_time(max_time_)