2019-01-14 19:22:09 +00:00
# include <Common/ThreadPool.h>
2022-04-10 23:03:24 +00:00
# include <Common/setThreadName.h>
2019-01-14 19:22:09 +00:00
# include <Common/Exception.h>
2020-07-16 23:12:47 +00:00
# include <Common/getNumberOfPhysicalCPUCores.h>
2022-07-07 09:41:43 +00:00
# include <Common/OpenTelemetryTraceContext.h>
2023-01-30 19:00:48 +00:00
# include <Common/noexcept_scope.h>
2019-01-14 19:22:09 +00:00
2020-06-22 19:04:12 +00:00
# include <cassert>
2019-01-14 19:22:09 +00:00
# include <type_traits>
2020-06-14 06:43:01 +00:00
# include <Poco/Util/Application.h>
# include <Poco/Util/LayeredConfiguration.h>
2023-03-29 14:28:36 +00:00
# include <base/demangle.h>
2020-06-14 06:43:01 +00:00
2019-01-14 19:22:09 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SCHEDULE_TASK ;
2020-09-15 10:29:47 +00:00
extern const int LOGICAL_ERROR ;
2019-01-14 19:22:09 +00:00
}
}
2019-01-14 10:59:58 +00:00
2019-08-01 20:09:38 +00:00
namespace CurrentMetrics
{
extern const Metric GlobalThread ;
extern const Metric GlobalThreadActive ;
2023-11-18 18:07:59 +00:00
extern const Metric GlobalThreadScheduled ;
2019-08-01 20:09:38 +00:00
}
2023-12-24 18:31:03 +00:00
class JobWithPriority
{
public :
using Job = std : : function < void ( ) > ;
Job job ;
Priority priority ;
CurrentMetrics : : Increment metric_increment ;
DB : : OpenTelemetry : : TracingContextOnThread thread_trace_context ;
/// Call stacks of all jobs' schedulings leading to this one
std : : vector < StackTrace : : FramePointers > frame_pointers ;
bool enable_job_stack_trace = false ;
JobWithPriority (
Job job_ , Priority priority_ , CurrentMetrics : : Metric metric ,
const DB : : OpenTelemetry : : TracingContextOnThread & thread_trace_context_ ,
bool capture_frame_pointers )
: job ( job_ ) , priority ( priority_ ) , metric_increment ( metric ) ,
thread_trace_context ( thread_trace_context_ ) , enable_job_stack_trace ( capture_frame_pointers )
{
if ( ! capture_frame_pointers )
return ;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB : : Exception : : thread_frame_pointers ;
frame_pointers . push_back ( StackTrace ( ) . getFramePointers ( ) ) ;
}
bool operator < ( const JobWithPriority & rhs ) const
{
return priority > rhs . priority ; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}
} ;
2023-03-29 14:28:36 +00:00
static constexpr auto DEFAULT_THREAD_NAME = " ThreadPool " ;
2019-01-14 10:59:58 +00:00
2020-07-16 23:12:47 +00:00
template < typename Thread >
2023-11-18 18:07:59 +00:00
ThreadPoolImpl < Thread > : : ThreadPoolImpl ( Metric metric_threads_ , Metric metric_active_threads_ , Metric metric_scheduled_jobs_ )
: ThreadPoolImpl ( metric_threads_ , metric_active_threads_ , metric_scheduled_jobs_ , getNumberOfPhysicalCPUCores ( ) )
2020-07-16 23:12:47 +00:00
{
}
2019-01-14 10:59:58 +00:00
template < typename Thread >
2023-03-22 07:49:22 +00:00
ThreadPoolImpl < Thread > : : ThreadPoolImpl (
Metric metric_threads_ ,
Metric metric_active_threads_ ,
2023-11-18 18:07:59 +00:00
Metric metric_scheduled_jobs_ ,
2023-03-22 07:49:22 +00:00
size_t max_threads_ )
2023-11-18 18:07:59 +00:00
: ThreadPoolImpl ( metric_threads_ , metric_active_threads_ , metric_scheduled_jobs_ , max_threads_ , max_threads_ , max_threads_ )
2019-01-14 10:59:58 +00:00
{
}
template < typename Thread >
2023-03-22 07:49:22 +00:00
ThreadPoolImpl < Thread > : : ThreadPoolImpl (
Metric metric_threads_ ,
Metric metric_active_threads_ ,
2023-11-18 18:07:59 +00:00
Metric metric_scheduled_jobs_ ,
2023-03-22 07:49:22 +00:00
size_t max_threads_ ,
size_t max_free_threads_ ,
size_t queue_size_ ,
bool shutdown_on_exception_ )
: metric_threads ( metric_threads_ )
, metric_active_threads ( metric_active_threads_ )
2023-11-18 18:07:59 +00:00
, metric_scheduled_jobs ( metric_scheduled_jobs_ )
2023-03-22 07:49:22 +00:00
, max_threads ( max_threads_ )
2023-03-17 09:24:30 +00:00
, max_free_threads ( std : : min ( max_free_threads_ , max_threads ) )
, queue_size ( queue_size_ ? std : : max ( queue_size_ , max_threads ) : 0 /* zero means the queue is unlimited */ )
Do not shutdown global thread pool on exception
Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.
Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):
2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):
1. Common/Exception.cpp:35: DB::Exception::Exception(...)
...
6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)
Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
, shutdown_on_exception ( shutdown_on_exception_ )
2019-01-14 10:59:58 +00:00
{
}
2019-08-02 17:14:04 +00:00
template < typename Thread >
void ThreadPoolImpl < Thread > : : setMaxThreads ( size_t value )
{
std : : lock_guard lock ( mutex ) ;
2023-03-16 18:00:51 +00:00
bool need_start_threads = ( value > max_threads ) ;
bool need_finish_free_threads = ( value < max_free_threads ) ;
2019-08-02 17:14:04 +00:00
max_threads = value ;
2023-03-16 18:00:51 +00:00
max_free_threads = std : : min ( max_free_threads , max_threads ) ;
2022-02-04 00:54:33 +00:00
/// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total.
2023-03-17 09:24:30 +00:00
queue_size = queue_size ? std : : max ( queue_size , max_threads ) : 0 ;
2022-02-03 23:29:46 +00:00
jobs . reserve ( queue_size ) ;
2023-03-16 18:00:51 +00:00
if ( need_start_threads )
2023-03-17 09:24:30 +00:00
{
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
2023-03-16 18:00:51 +00:00
startNewThreadsNoLock ( ) ;
2023-03-17 09:24:30 +00:00
}
2023-03-16 18:00:51 +00:00
else if ( need_finish_free_threads )
2023-03-17 09:24:30 +00:00
{
/// Wake up free threads so they can finish themselves.
2023-03-16 18:00:51 +00:00
new_job_or_shutdown . notify_all ( ) ;
2023-03-17 09:24:30 +00:00
}
2019-08-02 17:14:04 +00:00
}
2020-12-22 11:30:29 +00:00
template < typename Thread >
size_t ThreadPoolImpl < Thread > : : getMaxThreads ( ) const
{
std : : lock_guard lock ( mutex ) ;
return max_threads ;
}
2019-08-02 17:14:04 +00:00
template < typename Thread >
void ThreadPoolImpl < Thread > : : setMaxFreeThreads ( size_t value )
{
std : : lock_guard lock ( mutex ) ;
2023-03-16 18:00:51 +00:00
bool need_finish_free_threads = ( value < max_free_threads ) ;
2023-03-17 09:24:30 +00:00
max_free_threads = std : : min ( value , max_threads ) ;
2023-03-16 18:00:51 +00:00
if ( need_finish_free_threads )
2023-03-17 09:24:30 +00:00
{
/// Wake up free threads so they can finish themselves.
2023-03-16 18:00:51 +00:00
new_job_or_shutdown . notify_all ( ) ;
2023-03-17 09:24:30 +00:00
}
2019-08-02 17:14:04 +00:00
}
template < typename Thread >
void ThreadPoolImpl < Thread > : : setQueueSize ( size_t value )
{
std : : lock_guard lock ( mutex ) ;
2023-03-17 09:24:30 +00:00
queue_size = value ? std : : max ( value , max_threads ) : 0 ;
2021-09-02 21:31:32 +00:00
/// Reserve memory to get rid of allocations
2021-09-07 12:45:39 +00:00
jobs . reserve ( queue_size ) ;
2019-08-02 17:14:04 +00:00
}
2019-01-14 10:59:58 +00:00
template < typename Thread >
2019-01-14 19:22:09 +00:00
template < typename ReturnType >
2023-05-26 13:55:30 +00:00
ReturnType ThreadPoolImpl < Thread > : : scheduleImpl ( Job job , Priority priority , std : : optional < uint64_t > wait_microseconds , bool propagate_opentelemetry_tracing_context )
2019-01-14 10:59:58 +00:00
{
2021-05-31 07:22:42 +00:00
auto on_error = [ & ] ( const std : : string & reason )
2019-01-14 19:22:09 +00:00
{
if constexpr ( std : : is_same_v < ReturnType , void > )
2019-06-29 22:37:46 +00:00
{
if ( first_exception )
{
std : : exception_ptr exception ;
std : : swap ( exception , first_exception ) ;
std : : rethrow_exception ( exception ) ;
}
2021-05-31 07:22:42 +00:00
throw DB : : Exception ( DB : : ErrorCodes : : CANNOT_SCHEDULE_TASK ,
" Cannot schedule a task: {} (threads={}, jobs={}) " , reason ,
threads . size ( ) , scheduled_jobs ) ;
2019-06-29 22:37:46 +00:00
}
2019-01-14 19:22:09 +00:00
else
return false ;
} ;
2019-01-14 10:59:58 +00:00
{
2019-01-14 19:22:09 +00:00
std : : unique_lock lock ( mutex ) ;
auto pred = [ this ] { return ! queue_size | | scheduled_jobs < queue_size | | shutdown ; } ;
2019-08-02 17:14:04 +00:00
if ( wait_microseconds ) /// Check for optional. Condition is true if the optional is set and the value is zero.
2019-01-14 19:22:09 +00:00
{
if ( ! job_finished . wait_for ( lock , std : : chrono : : microseconds ( * wait_microseconds ) , pred ) )
2021-05-31 07:22:42 +00:00
return on_error ( fmt : : format ( " no free thread (timeout={}) " , *wait_microseconds)) ;
2019-01-14 19:22:09 +00:00
}
else
job_finished . wait ( lock , pred ) ;
2019-01-14 10:59:58 +00:00
if ( shutdown )
2021-05-31 07:22:42 +00:00
return on_error ( " shutdown " ) ;
2019-01-14 10:59:58 +00:00
2021-05-25 11:54:47 +00:00
/// We must not to allocate any memory after we emplaced a job in a queue.
2021-05-25 12:58:04 +00:00
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.
2019-01-14 10:59:58 +00:00
2021-05-24 16:24:03 +00:00
/// Check if there are enough threads to process job.
if ( threads . size ( ) < std : : min ( max_threads , scheduled_jobs + 1 ) )
2019-01-14 19:22:09 +00:00
{
2021-05-24 16:24:03 +00:00
try
{
threads . emplace_front ( ) ;
}
catch ( . . . )
{
/// Most likely this is a std::bad_alloc exception
2021-05-31 07:22:42 +00:00
return on_error ( " cannot allocate thread slot " ) ;
2021-05-24 16:24:03 +00:00
}
2019-01-14 19:22:09 +00:00
try
{
threads . front ( ) = Thread ( [ this , it = threads . begin ( ) ] { worker ( it ) ; } ) ;
}
catch ( . . . )
{
threads . pop_front ( ) ;
2021-05-31 07:22:42 +00:00
return on_error ( " cannot allocate thread " ) ;
2019-01-14 19:22:09 +00:00
}
}
2021-05-24 16:24:03 +00:00
2022-09-01 03:56:10 +00:00
jobs . emplace ( std : : move ( job ) ,
priority ,
2023-11-18 18:07:59 +00:00
metric_scheduled_jobs ,
2022-09-01 03:56:10 +00:00
/// Tracing context on this thread is used as parent context for the sub-thread that runs the job
2023-06-24 00:12:31 +00:00
propagate_opentelemetry_tracing_context ? DB : : OpenTelemetry : : CurrentContext ( ) : DB : : OpenTelemetry : : TracingContextOnThread ( ) ,
/// capture_frame_pointers
2023-06-28 01:39:06 +00:00
DB : : Exception : : enable_job_stack_trace ) ;
2022-08-30 04:26:23 +00:00
2021-05-24 16:24:03 +00:00
+ + scheduled_jobs ;
2019-01-14 10:59:58 +00:00
}
2021-05-24 16:24:03 +00:00
2023-03-17 20:52:05 +00:00
/// Wake up a free thread to run the new job.
2022-12-16 11:27:48 +00:00
new_job_or_shutdown . notify_one ( ) ;
2022-04-18 08:18:31 +00:00
return static_cast < ReturnType > ( true ) ;
2019-01-14 19:22:09 +00:00
}
2023-03-16 18:00:51 +00:00
template < typename Thread >
void ThreadPoolImpl < Thread > : : startNewThreadsNoLock ( )
{
2023-03-17 20:52:05 +00:00
if ( shutdown )
return ;
2023-03-17 09:24:30 +00:00
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
while ( threads . size ( ) < std : : min ( scheduled_jobs , max_threads ) )
2023-03-16 18:00:51 +00:00
{
try
{
threads . emplace_front ( ) ;
}
catch ( . . . )
{
2023-03-17 09:24:30 +00:00
break ; /// failed to start more threads
2023-03-16 18:00:51 +00:00
}
try
{
threads . front ( ) = Thread ( [ this , it = threads . begin ( ) ] { worker ( it ) ; } ) ;
}
catch ( . . . )
{
threads . pop_front ( ) ;
2023-03-17 09:24:30 +00:00
break ; /// failed to start more threads
2023-03-16 18:00:51 +00:00
}
2023-03-17 09:24:30 +00:00
}
2023-03-16 18:00:51 +00:00
}
2019-01-14 19:22:09 +00:00
template < typename Thread >
2023-05-26 13:55:30 +00:00
void ThreadPoolImpl < Thread > : : scheduleOrThrowOnError ( Job job , Priority priority )
2019-01-14 19:22:09 +00:00
{
scheduleImpl < void > ( std : : move ( job ) , priority , std : : nullopt ) ;
}
template < typename Thread >
2023-05-26 13:55:30 +00:00
bool ThreadPoolImpl < Thread > : : trySchedule ( Job job , Priority priority , uint64_t wait_microseconds ) noexcept
2019-01-14 19:22:09 +00:00
{
return scheduleImpl < bool > ( std : : move ( job ) , priority , wait_microseconds ) ;
}
template < typename Thread >
2023-05-26 13:55:30 +00:00
void ThreadPoolImpl < Thread > : : scheduleOrThrow ( Job job , Priority priority , uint64_t wait_microseconds , bool propagate_opentelemetry_tracing_context )
2019-01-14 19:22:09 +00:00
{
2022-09-01 03:56:10 +00:00
scheduleImpl < void > ( std : : move ( job ) , priority , wait_microseconds , propagate_opentelemetry_tracing_context ) ;
2019-01-14 10:59:58 +00:00
}
template < typename Thread >
void ThreadPoolImpl < Thread > : : wait ( )
{
2023-03-16 18:00:51 +00:00
std : : unique_lock lock ( mutex ) ;
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown . notify_all ( ) ;
job_finished . wait ( lock , [ this ] { return scheduled_jobs = = 0 ; } ) ;
2019-01-14 10:59:58 +00:00
2023-03-16 18:00:51 +00:00
if ( first_exception )
{
std : : exception_ptr exception ;
std : : swap ( exception , first_exception ) ;
std : : rethrow_exception ( exception ) ;
2019-01-14 10:59:58 +00:00
}
}
template < typename Thread >
ThreadPoolImpl < Thread > : : ~ ThreadPoolImpl ( )
{
2021-08-19 08:22:57 +00:00
/// Note: should not use logger from here,
/// because it can be an instance of GlobalThreadPool that is a global variable
/// and the destruction order of global variables is unspecified.
2019-01-14 10:59:58 +00:00
finalize ( ) ;
2023-01-30 19:00:48 +00:00
onDestroy ( ) ;
2019-01-14 10:59:58 +00:00
}
template < typename Thread >
void ThreadPoolImpl < Thread > : : finalize ( )
{
2023-03-17 20:52:05 +00:00
{
std : : lock_guard lock ( mutex ) ;
shutdown = true ;
/// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function.
threads_remove_themselves = false ;
}
/// Wake up threads so they can finish themselves.
new_job_or_shutdown . notify_all ( ) ;
2023-03-16 18:00:51 +00:00
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
2023-03-17 20:52:05 +00:00
for ( auto & thread : threads )
thread . join ( ) ;
threads . clear ( ) ;
2019-01-14 10:59:58 +00:00
}
2023-01-30 19:00:48 +00:00
template < typename Thread >
void ThreadPoolImpl < Thread > : : addOnDestroyCallback ( OnDestroyCallback & & callback )
{
std : : lock_guard lock ( mutex ) ;
on_destroy_callbacks . push ( std : : move ( callback ) ) ;
}
template < typename Thread >
void ThreadPoolImpl < Thread > : : onDestroy ( )
{
while ( ! on_destroy_callbacks . empty ( ) )
{
auto callback = std : : move ( on_destroy_callbacks . top ( ) ) ;
on_destroy_callbacks . pop ( ) ;
NOEXCEPT_SCOPE ( { callback ( ) ; } ) ;
}
}
2019-01-14 10:59:58 +00:00
template < typename Thread >
size_t ThreadPoolImpl < Thread > : : active ( ) const
{
2022-06-28 19:19:06 +00:00
std : : lock_guard lock ( mutex ) ;
2019-01-14 19:22:09 +00:00
return scheduled_jobs ;
2019-01-14 10:59:58 +00:00
}
2021-04-28 18:26:12 +00:00
template < typename Thread >
bool ThreadPoolImpl < Thread > : : finished ( ) const
{
2022-06-28 19:19:06 +00:00
std : : lock_guard lock ( mutex ) ;
2021-04-28 18:26:12 +00:00
return shutdown ;
}
2019-01-14 10:59:58 +00:00
template < typename Thread >
2019-01-14 19:22:09 +00:00
void ThreadPoolImpl < Thread > : : worker ( typename std : : list < Thread > : : iterator thread_it )
2019-01-14 10:59:58 +00:00
{
2021-01-12 14:34:50 +00:00
DENY_ALLOCATIONS_IN_SCOPE ;
2023-03-22 07:49:22 +00:00
CurrentMetrics : : Increment metric_pool_threads ( metric_threads ) ;
2019-08-01 20:09:38 +00:00
2023-04-13 11:27:11 +00:00
bool job_is_done = false ;
std : : exception_ptr exception_from_job ;
2023-03-17 20:52:05 +00:00
/// We'll run jobs in this worker while there are scheduled jobs and until some special event occurs (e.g. shutdown, or decreasing the number of max_threads).
2023-03-17 09:24:30 +00:00
/// And if `max_free_threads > 0` we keep this number of threads even when there are no jobs for them currently.
2019-01-14 10:59:58 +00:00
while ( true )
{
2022-04-10 23:03:24 +00:00
/// This is inside the loop to also reset previous thread names set inside the jobs.
2023-03-29 14:28:36 +00:00
setThreadName ( DEFAULT_THREAD_NAME ) ;
2022-04-10 23:03:24 +00:00
2023-03-17 20:52:05 +00:00
/// Get a job from the queue.
2023-11-18 18:07:59 +00:00
std : : optional < JobWithPriority > job_data ;
2023-03-17 20:52:05 +00:00
2019-01-14 10:59:58 +00:00
{
2019-01-14 19:22:09 +00:00
std : : unique_lock lock ( mutex ) ;
2023-04-13 11:27:11 +00:00
// Finish with previous job if any
if ( job_is_done )
{
job_is_done = false ;
if ( exception_from_job )
{
if ( ! first_exception )
first_exception = exception_from_job ;
if ( shutdown_on_exception )
shutdown = true ;
exception_from_job = { } ;
}
- - scheduled_jobs ;
2023-04-13 11:45:19 +00:00
job_finished . notify_all ( ) ;
if ( shutdown )
new_job_or_shutdown . notify_all ( ) ; /// `shutdown` was set, wake up other threads so they can finish themselves.
2023-04-13 11:27:11 +00:00
}
2023-04-13 12:11:06 +00:00
new_job_or_shutdown . wait ( lock , [ & ] { return ! jobs . empty ( ) | | shutdown | | threads . size ( ) > std : : min ( max_threads , scheduled_jobs + max_free_threads ) ; } ) ;
2019-01-14 10:59:58 +00:00
2023-04-13 12:11:06 +00:00
if ( jobs . empty ( ) | | threads . size ( ) > std : : min ( max_threads , scheduled_jobs + max_free_threads ) )
2021-09-06 11:37:51 +00:00
{
2023-04-13 15:17:39 +00:00
// We enter here if:
// - either this thread is not needed anymore due to max_free_threads excess;
// - or shutdown happened AND all jobs are already handled.
2023-04-13 12:11:06 +00:00
if ( threads_remove_themselves )
{
thread_it - > detach ( ) ;
threads . erase ( thread_it ) ;
}
2021-09-06 11:37:51 +00:00
return ;
}
2023-03-16 18:00:51 +00:00
/// boost::priority_queue does not provide interface for getting non-const reference to an element
2023-11-18 18:07:59 +00:00
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority.
job_data = std : : move ( const_cast < JobWithPriority & > ( jobs . top ( ) ) ) ;
2023-03-16 18:00:51 +00:00
jobs . pop ( ) ;
2023-04-13 11:27:11 +00:00
2023-04-13 12:11:06 +00:00
/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
2023-04-13 11:27:11 +00:00
if ( shutdown )
2023-05-05 19:13:50 +00:00
{
job_is_done = true ;
2023-04-13 11:27:11 +00:00
continue ;
2023-05-05 19:13:50 +00:00
}
2019-01-14 10:59:58 +00:00
}
2023-04-13 11:27:11 +00:00
ALLOW_ALLOCATIONS_IN_SCOPE ;
2022-09-01 03:56:10 +00:00
2023-04-13 11:27:11 +00:00
/// Set up tracing context for this thread by its parent context.
2023-11-18 18:07:59 +00:00
DB : : OpenTelemetry : : TracingContextHolder thread_trace_context ( " ThreadPool::worker() " , job_data - > thread_trace_context ) ;
2022-07-07 09:41:43 +00:00
2023-04-13 11:27:11 +00:00
/// Run the job.
try
{
2023-06-28 01:39:06 +00:00
if ( DB : : Exception : : enable_job_stack_trace )
2023-11-18 18:07:59 +00:00
DB : : Exception : : thread_frame_pointers = std : : move ( job_data - > frame_pointers ) ;
2023-06-24 00:12:31 +00:00
2023-04-13 11:27:11 +00:00
CurrentMetrics : : Increment metric_active_pool_threads ( metric_active_threads ) ;
2019-08-01 20:09:38 +00:00
2023-11-18 18:07:59 +00:00
job_data - > job ( ) ;
2022-07-07 09:41:43 +00:00
2023-04-13 11:27:11 +00:00
if ( thread_trace_context . root_span . isTraceEnabled ( ) )
{
/// Use the thread name as operation name so that the tracing log will be more clear.
/// The thread name is usually set in jobs, we can only get the name after the job finishes
std : : string thread_name = getThreadName ( ) ;
if ( ! thread_name . empty ( ) & & thread_name ! = DEFAULT_THREAD_NAME )
2022-07-07 09:41:43 +00:00
{
2023-04-13 11:27:11 +00:00
thread_trace_context . root_span . operation_name = thread_name ;
}
else
{
/// If the thread name is not set, use the type name of the job instead
2023-11-18 18:07:59 +00:00
thread_trace_context . root_span . operation_name = demangle ( job_data - > job . target_type ( ) . name ( ) ) ;
2022-07-07 09:41:43 +00:00
}
2019-01-14 10:59:58 +00:00
}
2023-04-13 11:27:11 +00:00
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
2023-11-18 18:07:59 +00:00
job_data . reset ( ) ;
2023-03-17 20:52:05 +00:00
}
2023-04-13 11:27:11 +00:00
catch ( . . . )
2019-01-14 10:59:58 +00:00
{
2023-04-13 11:27:11 +00:00
exception_from_job = std : : current_exception ( ) ;
thread_trace_context . root_span . addAttribute ( exception_from_job ) ;
2019-01-14 10:59:58 +00:00
2023-04-13 11:27:11 +00:00
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
2023-11-18 18:07:59 +00:00
job_data . reset ( ) ;
2023-03-16 18:00:51 +00:00
}
2023-04-13 11:27:11 +00:00
job_is_done = true ;
2019-01-14 10:59:58 +00:00
}
}
template class ThreadPoolImpl < std : : thread > ;
2022-09-01 03:56:10 +00:00
template class ThreadPoolImpl < ThreadFromGlobalPoolImpl < false > > ;
template class ThreadFromGlobalPoolImpl < true > ;
2019-01-14 10:59:58 +00:00
2020-06-22 19:04:12 +00:00
std : : unique_ptr < GlobalThreadPool > GlobalThreadPool : : the_instance ;
2019-01-14 10:59:58 +00:00
2023-03-22 07:49:22 +00:00
GlobalThreadPool : : GlobalThreadPool (
size_t max_threads_ ,
size_t max_free_threads_ ,
size_t queue_size_ ,
const bool shutdown_on_exception_ )
: FreeThreadPool (
CurrentMetrics : : GlobalThread ,
CurrentMetrics : : GlobalThreadActive ,
2023-11-18 18:07:59 +00:00
CurrentMetrics : : GlobalThreadScheduled ,
2023-03-22 07:49:22 +00:00
max_threads_ ,
max_free_threads_ ,
queue_size_ ,
shutdown_on_exception_ )
{
}
2021-11-12 13:24:47 +00:00
void GlobalThreadPool : : initialize ( size_t max_threads , size_t max_free_threads , size_t queue_size )
2019-08-22 03:24:05 +00:00
{
2020-09-15 10:29:47 +00:00
if ( the_instance )
{
2020-09-16 08:59:58 +00:00
throw DB : : Exception ( DB : : ErrorCodes : : LOGICAL_ERROR ,
2020-09-15 10:29:47 +00:00
" The global thread pool is initialized twice " ) ;
}
2020-06-14 06:43:01 +00:00
2021-11-12 13:24:47 +00:00
the_instance . reset ( new GlobalThreadPool ( max_threads , max_free_threads , queue_size , false /*shutdown_on_exception*/ ) ) ;
2020-06-22 19:04:12 +00:00
}
2020-06-14 06:43:01 +00:00
2020-06-22 19:04:12 +00:00
GlobalThreadPool & GlobalThreadPool : : instance ( )
{
if ( ! the_instance )
{
2020-06-23 17:31:46 +00:00
// Allow implicit initialization. This is needed for old code that is
// impractical to redo now, especially Arcadia users and unit tests.
initialize ( ) ;
2020-06-22 19:04:12 +00:00
}
2020-06-14 06:43:01 +00:00
2020-06-22 19:04:12 +00:00
return * the_instance ;
2019-08-22 03:24:05 +00:00
}
2023-07-28 07:47:09 +00:00
void GlobalThreadPool : : shutdown ( )
{
if ( the_instance )
{
the_instance - > finalize ( ) ;
}
}