From fb32a99578b57cf185f6e868879aaf2ff218419d Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 13 Jun 2024 19:13:13 +0200 Subject: [PATCH] Initialize global trace collector for Poco::ThreadPool --- base/poco/Foundation/CMakeLists.txt | 1 + .../poco/Foundation/include/Poco/ThreadPool.h | 20 ++++- base/poco/Foundation/src/ThreadPool.cpp | 75 ++++++++++++------- programs/server/Server.cpp | 18 +++-- src/Server/HTTPHandler.cpp | 1 - src/Server/InterserverIOHTTPHandler.cpp | 2 - src/Server/KeeperTCPHandler.cpp | 1 - src/Server/MySQLHandler.cpp | 2 - src/Server/PostgreSQLHandler.cpp | 2 - src/Server/TCPHandler.cpp | 1 - src/Server/TCPHandler.h | 1 - 11 files changed, 81 insertions(+), 43 deletions(-) diff --git a/base/poco/Foundation/CMakeLists.txt b/base/poco/Foundation/CMakeLists.txt index dfb41a33fb1..324a0170bdd 100644 --- a/base/poco/Foundation/CMakeLists.txt +++ b/base/poco/Foundation/CMakeLists.txt @@ -213,6 +213,7 @@ target_compile_definitions (_poco_foundation ) target_include_directories (_poco_foundation SYSTEM PUBLIC "include") +target_link_libraries (_poco_foundation PRIVATE clickhouse_common_io) target_link_libraries (_poco_foundation PRIVATE diff --git a/base/poco/Foundation/include/Poco/ThreadPool.h b/base/poco/Foundation/include/Poco/ThreadPool.h index b9506cc5b7f..e2187bfeb66 100644 --- a/base/poco/Foundation/include/Poco/ThreadPool.h +++ b/base/poco/Foundation/include/Poco/ThreadPool.h @@ -48,7 +48,13 @@ class Foundation_API ThreadPool /// from the pool. { public: - ThreadPool(int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE); + explicit ThreadPool( + int minCapacity = 2, + int maxCapacity = 16, + int idleTime = 60, + int stackSize = POCO_THREAD_STACK_SIZE, + size_t global_profiler_real_time_period_ns_ = 0, + size_t global_profiler_cpu_time_period_ns_ = 0); /// Creates a thread pool with minCapacity threads. /// If required, up to maxCapacity threads are created /// a NoThreadAvailableException exception is thrown. @@ -56,8 +62,14 @@ public: /// and more than minCapacity threads are running, the thread /// is killed. Threads are created with given stack size. - ThreadPool( - const std::string & name, int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE); + explicit ThreadPool( + const std::string & name, + int minCapacity = 2, + int maxCapacity = 16, + int idleTime = 60, + int stackSize = POCO_THREAD_STACK_SIZE, + size_t global_profiler_real_time_period_ns_ = 0, + size_t global_profiler_cpu_time_period_ns_ = 0); /// Creates a thread pool with the given name and minCapacity threads. /// If required, up to maxCapacity threads are created /// a NoThreadAvailableException exception is thrown. @@ -171,6 +183,8 @@ private: int _serial; int _age; int _stackSize; + size_t _globalProfilerRealTimePeriodNs; + size_t _globalProfilerCPUTimePeriodNs; ThreadVec _threads; mutable FastMutex _mutex; }; diff --git a/base/poco/Foundation/src/ThreadPool.cpp b/base/poco/Foundation/src/ThreadPool.cpp index 6335ee82b47..f57c81e4128 100644 --- a/base/poco/Foundation/src/ThreadPool.cpp +++ b/base/poco/Foundation/src/ThreadPool.cpp @@ -20,6 +20,7 @@ #include "Poco/ErrorHandler.h" #include #include +#include namespace Poco { @@ -28,7 +29,11 @@ namespace Poco { class PooledThread: public Runnable { public: - PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); + explicit PooledThread( + const std::string& name, + int stackSize = POCO_THREAD_STACK_SIZE, + size_t globalProfilerRealTimePeriodNs_ = 0, + size_t globalProfilerCPUTimePeriodNs_ = 0); ~PooledThread(); void start(); @@ -51,16 +56,24 @@ private: Event _targetCompleted; Event _started; FastMutex _mutex; + size_t _globalProfilerRealTimePeriodNs; + size_t _globalProfilerCPUTimePeriodNs; }; -PooledThread::PooledThread(const std::string& name, int stackSize): - _idle(true), - _idleTime(0), - _pTarget(0), - _name(name), +PooledThread::PooledThread( + const std::string& name, + int stackSize, + size_t globalProfilerRealTimePeriodNs_, + size_t globalProfilerCPUTimePeriodNs_) : + _idle(true), + _idleTime(0), + _pTarget(0), + _name(name), _thread(name), - _targetCompleted(false) + _targetCompleted(false), + _globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_), + _globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_) { poco_assert_dbg (stackSize >= 0); _thread.setStackSize(stackSize); @@ -83,7 +96,7 @@ void PooledThread::start() void PooledThread::start(Thread::Priority priority, Runnable& target) { FastMutex::ScopedLock lock(_mutex); - + poco_assert (_pTarget == 0); _pTarget = ⌖ @@ -109,7 +122,7 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std: } _thread.setName(fullName); _thread.setPriority(priority); - + poco_assert (_pTarget == 0); _pTarget = ⌖ @@ -145,7 +158,7 @@ void PooledThread::join() void PooledThread::activate() { FastMutex::ScopedLock lock(_mutex); - + poco_assert (_idle); _idle = false; _targetCompleted.reset(); @@ -155,7 +168,7 @@ void PooledThread::activate() void PooledThread::release() { const long JOIN_TIMEOUT = 10000; - + _mutex.lock(); _pTarget = 0; _mutex.unlock(); @@ -174,6 +187,10 @@ void PooledThread::release() void PooledThread::run() { + DB::ThreadStatus thread_status; + if (unlikely(_globalProfilerRealTimePeriodNs != 0 || _globalProfilerCPUTimePeriodNs != 0)) + thread_status.initGlobalProfiler(_globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs); + _started.set(); for (;;) { @@ -220,13 +237,17 @@ void PooledThread::run() ThreadPool::ThreadPool(int minCapacity, int maxCapacity, int idleTime, - int stackSize): - _minCapacity(minCapacity), - _maxCapacity(maxCapacity), + int stackSize, + size_t globalProfilerRealTimePeriodNs_, + size_t globalProfilerCPUTimePeriodNs_) : + _minCapacity(minCapacity), + _maxCapacity(maxCapacity), _idleTime(idleTime), _serial(0), _age(0), - _stackSize(stackSize) + _stackSize(stackSize), + _globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_), + _globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_) { poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); @@ -243,14 +264,18 @@ ThreadPool::ThreadPool(const std::string& name, int minCapacity, int maxCapacity, int idleTime, - int stackSize): + int stackSize, + size_t globalProfilerRealTimePeriodNs_, + size_t globalProfilerCPUTimePeriodNs_) : _name(name), - _minCapacity(minCapacity), - _maxCapacity(maxCapacity), + _minCapacity(minCapacity), + _maxCapacity(maxCapacity), _idleTime(idleTime), _serial(0), _age(0), - _stackSize(stackSize) + _stackSize(stackSize), + _globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_), + _globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_) { poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); @@ -393,15 +418,15 @@ void ThreadPool::housekeep() ThreadVec activeThreads; idleThreads.reserve(_threads.size()); activeThreads.reserve(_threads.size()); - + for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) { if ((*it)->idle()) { if ((*it)->idleTime() < _idleTime) idleThreads.push_back(*it); - else - expiredThreads.push_back(*it); + else + expiredThreads.push_back(*it); } else activeThreads.push_back(*it); } @@ -463,7 +488,7 @@ PooledThread* ThreadPool::createThread() { std::ostringstream name; name << _name << "[#" << ++_serial << "]"; - return new PooledThread(name.str(), _stackSize); + return new PooledThread(name.str(), _stackSize, _globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs); } @@ -481,7 +506,7 @@ public: ThreadPool* pool() { FastMutex::ScopedLock lock(_mutex); - + if (!_pPool) { _pPool = new ThreadPool("default"); @@ -490,7 +515,7 @@ public: } return _pPool; } - + private: ThreadPool* _pPool; FastMutex _mutex; diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c3a5be706b4..8a5f6173c49 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -721,11 +722,6 @@ try CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision()); CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger()); - Poco::ThreadPool server_pool(3, server_settings.max_connections); - std::mutex servers_lock; - std::vector servers; - std::vector servers_to_start_before_tables; - /** Context contains all that query execution is dependent: * settings, available functions, data types, aggregate functions, databases, ... */ @@ -823,6 +819,18 @@ try total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size); } + Poco::ThreadPool server_pool( + /* minCapacity */3, + /* maxCapacity */server_settings.max_connections, + /* idleTime */60, + /* stackSize */POCO_THREAD_STACK_SIZE, + server_settings.global_profiler_real_time_period_ns, + server_settings.global_profiler_cpu_time_period_ns); + + std::mutex servers_lock; + std::vector servers; + std::vector servers_to_start_before_tables; + /// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed). SCOPE_EXIT({ Stopwatch watch; diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index 02d0959ff50..d6afa571e71 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -1060,7 +1060,6 @@ void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) { setThreadName("HTTPHandler"); - ThreadStatus thread_status; session = std::make_unique(server.context(), ClientInfo::Interface::HTTP, request.isSecure()); SCOPE_EXIT({ session.reset(); }); diff --git a/src/Server/InterserverIOHTTPHandler.cpp b/src/Server/InterserverIOHTTPHandler.cpp index 0d79aaa227b..45c28babe3a 100644 --- a/src/Server/InterserverIOHTTPHandler.cpp +++ b/src/Server/InterserverIOHTTPHandler.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -81,7 +80,6 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) { setThreadName("IntersrvHandler"); - ThreadStatus thread_status; /// In order to work keep-alive. if (request.getVersion() == HTTPServerRequest::HTTP_1_1) diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 6709cd298e5..4612e2e9fa8 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -309,7 +309,6 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool void KeeperTCPHandler::runImpl() { setThreadName("KeeperHandler"); - ThreadStatus thread_status; socket().setReceiveTimeout(receive_timeout); socket().setSendTimeout(send_timeout); diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index 6456f6d24ff..9471509ad4b 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -199,7 +198,6 @@ MySQLHandler::~MySQLHandler() = default; void MySQLHandler::run() { setThreadName("MySQLHandler"); - ThreadStatus thread_status; session = std::make_unique(server.context(), ClientInfo::Interface::MYSQL); SCOPE_EXIT({ session.reset(); }); diff --git a/src/Server/PostgreSQLHandler.cpp b/src/Server/PostgreSQLHandler.cpp index 473d681ddb2..8ba8421e6f0 100644 --- a/src/Server/PostgreSQLHandler.cpp +++ b/src/Server/PostgreSQLHandler.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -59,7 +58,6 @@ void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket) void PostgreSQLHandler::run() { setThreadName("PostgresHandler"); - ThreadStatus thread_status; session = std::make_unique(server.context(), ClientInfo::Interface::POSTGRESQL); SCOPE_EXIT({ session.reset(); }); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index e3a820340ad..b60339e9fd8 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -246,7 +246,6 @@ TCPHandler::~TCPHandler() void TCPHandler::runImpl() { setThreadName("TCPHandler"); - ThreadStatus thread_status; extractConnectionSettingsFromContext(server.context()); diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 191617f1905..75e36836b63 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include