Initialize global trace collector for Poco::ThreadPool

This commit is contained in:
kssenii 2024-06-13 19:13:13 +02:00
parent e8c0caa5ab
commit fb32a99578
11 changed files with 81 additions and 43 deletions

View File

@ -213,6 +213,7 @@ target_compile_definitions (_poco_foundation
)
target_include_directories (_poco_foundation SYSTEM PUBLIC "include")
target_link_libraries (_poco_foundation PRIVATE clickhouse_common_io)
target_link_libraries (_poco_foundation
PRIVATE

View File

@ -48,7 +48,13 @@ class Foundation_API ThreadPool
/// from the pool.
{
public:
ThreadPool(int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE);
explicit ThreadPool(
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t global_profiler_real_time_period_ns_ = 0,
size_t global_profiler_cpu_time_period_ns_ = 0);
/// Creates a thread pool with minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
@ -56,8 +62,14 @@ public:
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
ThreadPool(
const std::string & name, int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE);
explicit ThreadPool(
const std::string & name,
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t global_profiler_real_time_period_ns_ = 0,
size_t global_profiler_cpu_time_period_ns_ = 0);
/// Creates a thread pool with the given name and minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
@ -171,6 +183,8 @@ private:
int _serial;
int _age;
int _stackSize;
size_t _globalProfilerRealTimePeriodNs;
size_t _globalProfilerCPUTimePeriodNs;
ThreadVec _threads;
mutable FastMutex _mutex;
};

View File

@ -20,6 +20,7 @@
#include "Poco/ErrorHandler.h"
#include <sstream>
#include <ctime>
#include <Common/ThreadPool.h>
namespace Poco {
@ -28,7 +29,11 @@ namespace Poco {
class PooledThread: public Runnable
{
public:
PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
explicit PooledThread(
const std::string& name,
int stackSize = POCO_THREAD_STACK_SIZE,
size_t globalProfilerRealTimePeriodNs_ = 0,
size_t globalProfilerCPUTimePeriodNs_ = 0);
~PooledThread();
void start();
@ -51,16 +56,24 @@ private:
Event _targetCompleted;
Event _started;
FastMutex _mutex;
size_t _globalProfilerRealTimePeriodNs;
size_t _globalProfilerCPUTimePeriodNs;
};
PooledThread::PooledThread(const std::string& name, int stackSize):
_idle(true),
_idleTime(0),
_pTarget(0),
_name(name),
PooledThread::PooledThread(
const std::string& name,
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_idle(true),
_idleTime(0),
_pTarget(0),
_name(name),
_thread(name),
_targetCompleted(false)
_targetCompleted(false),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert_dbg (stackSize >= 0);
_thread.setStackSize(stackSize);
@ -83,7 +96,7 @@ void PooledThread::start()
void PooledThread::start(Thread::Priority priority, Runnable& target)
{
FastMutex::ScopedLock lock(_mutex);
poco_assert (_pTarget == 0);
_pTarget = &target;
@ -109,7 +122,7 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std:
}
_thread.setName(fullName);
_thread.setPriority(priority);
poco_assert (_pTarget == 0);
_pTarget = &target;
@ -145,7 +158,7 @@ void PooledThread::join()
void PooledThread::activate()
{
FastMutex::ScopedLock lock(_mutex);
poco_assert (_idle);
_idle = false;
_targetCompleted.reset();
@ -155,7 +168,7 @@ void PooledThread::activate()
void PooledThread::release()
{
const long JOIN_TIMEOUT = 10000;
_mutex.lock();
_pTarget = 0;
_mutex.unlock();
@ -174,6 +187,10 @@ void PooledThread::release()
void PooledThread::run()
{
DB::ThreadStatus thread_status;
if (unlikely(_globalProfilerRealTimePeriodNs != 0 || _globalProfilerCPUTimePeriodNs != 0))
thread_status.initGlobalProfiler(_globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs);
_started.set();
for (;;)
{
@ -220,13 +237,17 @@ void PooledThread::run()
ThreadPool::ThreadPool(int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
@ -243,14 +264,18 @@ ThreadPool::ThreadPool(const std::string& name,
int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
size_t globalProfilerRealTimePeriodNs_,
size_t globalProfilerCPUTimePeriodNs_) :
_name(name),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_globalProfilerRealTimePeriodNs(globalProfilerRealTimePeriodNs_),
_globalProfilerCPUTimePeriodNs(globalProfilerCPUTimePeriodNs_)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
@ -393,15 +418,15 @@ void ThreadPool::housekeep()
ThreadVec activeThreads;
idleThreads.reserve(_threads.size());
activeThreads.reserve(_threads.size());
for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
{
if ((*it)->idle())
{
if ((*it)->idleTime() < _idleTime)
idleThreads.push_back(*it);
else
expiredThreads.push_back(*it);
else
expiredThreads.push_back(*it);
}
else activeThreads.push_back(*it);
}
@ -463,7 +488,7 @@ PooledThread* ThreadPool::createThread()
{
std::ostringstream name;
name << _name << "[#" << ++_serial << "]";
return new PooledThread(name.str(), _stackSize);
return new PooledThread(name.str(), _stackSize, _globalProfilerRealTimePeriodNs, _globalProfilerCPUTimePeriodNs);
}
@ -481,7 +506,7 @@ public:
ThreadPool* pool()
{
FastMutex::ScopedLock lock(_mutex);
if (!_pPool)
{
_pPool = new ThreadPool("default");
@ -490,7 +515,7 @@ public:
}
return _pPool;
}
private:
ThreadPool* _pPool;
FastMutex _mutex;

View File

@ -10,6 +10,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Environment.h>
#include <Poco/Config.h>
#include <Common/scope_guard_safe.h>
#include <Common/logger_useful.h>
#include <base/phdr_cache.h>
@ -721,11 +722,6 @@ try
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
Poco::ThreadPool server_pool(3, server_settings.max_connections);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases, ...
*/
@ -823,6 +819,18 @@ try
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
}
Poco::ThreadPool server_pool(
/* minCapacity */3,
/* maxCapacity */server_settings.max_connections,
/* idleTime */60,
/* stackSize */POCO_THREAD_STACK_SIZE,
server_settings.global_profiler_real_time_period_ns,
server_settings.global_profiler_cpu_time_period_ns);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;

View File

@ -1060,7 +1060,6 @@ void HTTPHandler::formatExceptionForClient(int exception_code, HTTPServerRequest
void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("HTTPHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::HTTP, request.isSecure());
SCOPE_EXIT({ session.reset(); });

View File

@ -8,7 +8,6 @@
#include <Interpreters/InterserverIOHandler.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Common/ThreadStatus.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
@ -81,7 +80,6 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("IntersrvHandler");
ThreadStatus thread_status;
/// In order to work keep-alive.
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)

View File

@ -309,7 +309,6 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool
void KeeperTCPHandler::runImpl()
{
setThreadName("KeeperHandler");
ThreadStatus thread_status;
socket().setReceiveTimeout(receive_timeout);
socket().setSendTimeout(send_timeout);

View File

@ -24,7 +24,6 @@
#include <Common/CurrentThread.h>
#include <Common/NetException.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/logger_useful.h>
#include <Common/re2.h>
@ -199,7 +198,6 @@ MySQLHandler::~MySQLHandler() = default;
void MySQLHandler::run()
{
setThreadName("MySQLHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::MYSQL);
SCOPE_EXIT({ session.reset(); });

View File

@ -10,7 +10,6 @@
#include <base/scope_guard.h>
#include <pcg_random.hpp>
#include <Common/CurrentThread.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/randomSeed.h>
#include <Common/setThreadName.h>
@ -59,7 +58,6 @@ void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket)
void PostgreSQLHandler::run()
{
setThreadName("PostgresHandler");
ThreadStatus thread_status;
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::POSTGRESQL);
SCOPE_EXIT({ session.reset(); });

View File

@ -246,7 +246,6 @@ TCPHandler::~TCPHandler()
void TCPHandler::runImpl()
{
setThreadName("TCPHandler");
ThreadStatus thread_status;
extractConnectionSettingsFromContext(server.context());

View File

@ -7,7 +7,6 @@
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadStatus.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>