2019-01-14 10:59:58 +00:00
# pragma once
# include <cstdint>
# include <thread>
# include <mutex>
# include <condition_variable>
# include <functional>
# include <queue>
2019-01-14 19:22:09 +00:00
# include <list>
# include <optional>
2021-08-31 20:16:03 +00:00
# include <atomic>
2019-01-14 10:59:58 +00:00
2021-09-06 11:37:51 +00:00
# include <boost/heap/priority_queue.hpp>
2019-03-06 17:54:20 +00:00
# include <Poco/Event.h>
2019-01-14 19:22:09 +00:00
# include <Common/ThreadStatus.h>
2022-08-02 07:43:19 +00:00
# include <Common/OpenTelemetryTraceContext.h>
2021-10-02 07:13:14 +00:00
# include <base/scope_guard.h>
2019-01-14 19:22:09 +00:00
2019-01-14 10:59:58 +00:00
/** Very simple thread pool similar to boost::threadpool.
* Advantages :
* - catches exceptions and rethrows on wait .
2019-01-14 19:22:09 +00:00
*
* This thread pool can be used as a task queue .
* For example , you can create a thread pool with 10 threads ( and queue of size 10 ) and schedule 1000 tasks
* - in this case you will be blocked to keep 10 tasks in fly .
*
* Thread : std : : thread or something with identical interface .
2019-01-14 10:59:58 +00:00
*/
template < typename Thread >
class ThreadPoolImpl
{
public :
using Job = std : : function < void ( ) > ;
2020-07-16 23:12:47 +00:00
/// Maximum number of threads is based on the number of physical cores.
ThreadPoolImpl ( ) ;
2019-01-14 10:59:58 +00:00
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
2019-08-03 11:02:40 +00:00
explicit ThreadPoolImpl ( size_t max_threads_ ) ;
2019-01-14 10:59:58 +00:00
2019-01-14 19:22:09 +00:00
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
Do not shutdown global thread pool on exception
Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.
Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):
2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):
1. Common/Exception.cpp:35: DB::Exception::Exception(...)
...
6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)
Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
ThreadPoolImpl ( size_t max_threads_ , size_t max_free_threads_ , size_t queue_size_ , bool shutdown_on_exception_ = true ) ;
2019-01-14 10:59:58 +00:00
2019-01-14 19:22:09 +00:00
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
2019-10-17 14:41:27 +00:00
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
2019-01-14 10:59:58 +00:00
/// Priority: greater is higher.
2019-10-17 14:41:27 +00:00
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
2022-10-07 10:46:45 +00:00
void scheduleOrThrowOnError ( Job job , ssize_t priority = 0 ) ;
2019-01-14 10:59:58 +00:00
2019-10-17 14:41:27 +00:00
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
2022-10-07 10:46:45 +00:00
bool trySchedule ( Job job , ssize_t priority = 0 , uint64_t wait_microseconds = 0 ) noexcept ;
2019-01-14 19:22:09 +00:00
2019-10-17 14:41:27 +00:00
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
2022-10-07 10:46:45 +00:00
void scheduleOrThrow ( Job job , ssize_t priority = 0 , uint64_t wait_microseconds = 0 , bool propagate_opentelemetry_tracing_context = true ) ;
2019-01-14 19:22:09 +00:00
2019-01-14 10:59:58 +00:00
/// Wait for all currently active jobs to be done.
2019-10-17 14:41:27 +00:00
/// You may call schedule and wait many times in arbitrary order.
2019-01-14 10:59:58 +00:00
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait ( ) ;
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
~ ThreadPoolImpl ( ) ;
/// Returns number of running and scheduled jobs.
size_t active ( ) const ;
2021-04-28 18:26:12 +00:00
/// Returns true if the pool already terminated
/// (and any further scheduling will produce CANNOT_SCHEDULE_TASK exception)
bool finished ( ) const ;
2019-08-02 17:14:04 +00:00
void setMaxThreads ( size_t value ) ;
void setMaxFreeThreads ( size_t value ) ;
void setQueueSize ( size_t value ) ;
2020-12-22 11:30:29 +00:00
size_t getMaxThreads ( ) const ;
2019-08-02 17:14:04 +00:00
2019-01-14 10:59:58 +00:00
private :
mutable std : : mutex mutex ;
std : : condition_variable job_finished ;
std : : condition_variable new_job_or_shutdown ;
2019-08-02 17:14:04 +00:00
size_t max_threads ;
size_t max_free_threads ;
size_t queue_size ;
2019-01-14 10:59:58 +00:00
2019-01-14 19:22:09 +00:00
size_t scheduled_jobs = 0 ;
2019-01-14 10:59:58 +00:00
bool shutdown = false ;
Do not shutdown global thread pool on exception
Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.
Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):
2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):
1. Common/Exception.cpp:35: DB::Exception::Exception(...)
...
6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)
Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
const bool shutdown_on_exception = true ;
2019-01-14 10:59:58 +00:00
struct JobWithPriority
{
Job job ;
2022-10-07 10:46:45 +00:00
ssize_t priority ;
2022-08-24 08:41:40 +00:00
DB : : OpenTelemetry : : TracingContextOnThread thread_trace_context ;
2019-01-14 10:59:58 +00:00
2022-10-07 10:46:45 +00:00
JobWithPriority ( Job job_ , ssize_t priority_ , const DB : : OpenTelemetry : : TracingContextOnThread & thread_trace_context_ )
2022-07-07 09:41:43 +00:00
: job ( job_ ) , priority ( priority_ ) , thread_trace_context ( thread_trace_context_ ) { }
2019-01-14 10:59:58 +00:00
bool operator < ( const JobWithPriority & rhs ) const
{
return priority < rhs . priority ;
}
} ;
2021-09-06 11:37:51 +00:00
boost : : heap : : priority_queue < JobWithPriority > jobs ;
2019-01-14 19:22:09 +00:00
std : : list < Thread > threads ;
2019-01-14 10:59:58 +00:00
std : : exception_ptr first_exception ;
2019-01-14 19:22:09 +00:00
template < typename ReturnType >
2022-10-07 10:46:45 +00:00
ReturnType scheduleImpl ( Job job , ssize_t priority , std : : optional < uint64_t > wait_microseconds , bool propagate_opentelemetry_tracing_context = true ) ;
2019-01-14 19:22:09 +00:00
void worker ( typename std : : list < Thread > : : iterator thread_it ) ;
2019-01-14 10:59:58 +00:00
void finalize ( ) ;
} ;
2019-01-14 19:22:09 +00:00
/// ThreadPool with std::thread for threads.
2019-01-14 10:59:58 +00:00
using FreeThreadPool = ThreadPoolImpl < std : : thread > ;
2019-01-14 19:22:09 +00:00
/** Global ThreadPool that can be used as a singleton.
* Why it is needed ?
*
* Linux can create and destroy about 100 000 threads per second ( quite good ) .
* With simple ThreadPool ( based on mutex and condvar ) you can assign about 200 000 tasks per second
* - not much difference comparing to not using a thread pool at all .
*
* But if you reuse OS threads instead of creating and destroying them , several benefits exist :
* - allocator performance will usually be better due to reuse of thread local caches , especially for jemalloc :
* https : //github.com/jemalloc/jemalloc/issues/1347
* - address sanitizer and thread sanitizer will not fail due to global limit on number of created threads .
* - program will work faster in gdb ;
*/
2019-08-22 03:24:05 +00:00
class GlobalThreadPool : public FreeThreadPool , private boost : : noncopyable
2019-01-14 10:59:58 +00:00
{
2020-06-22 19:04:12 +00:00
static std : : unique_ptr < GlobalThreadPool > the_instance ;
GlobalThreadPool ( size_t max_threads_ , size_t max_free_threads_ ,
size_t queue_size_ , const bool shutdown_on_exception_ )
: FreeThreadPool ( max_threads_ , max_free_threads_ , queue_size_ ,
shutdown_on_exception_ )
{ }
2019-01-14 10:59:58 +00:00
public :
2021-11-12 13:24:47 +00:00
static void initialize ( size_t max_threads = 10000 , size_t max_free_threads = 1000 , size_t queue_size = 10000 ) ;
2019-08-22 03:24:05 +00:00
static GlobalThreadPool & instance ( ) ;
2019-01-14 10:59:58 +00:00
} ;
2019-01-14 19:22:09 +00:00
/** Looks like std::thread but allocates threads in GlobalThreadPool.
* Also holds ThreadStatus for ClickHouse .
2022-09-01 03:56:10 +00:00
*
* NOTE : User code should use ' ThreadFromGlobalPool ' declared below instead of directly using this class .
*
2019-01-14 19:22:09 +00:00
*/
2022-09-01 03:56:10 +00:00
template < bool propagate_opentelemetry_context = true >
class ThreadFromGlobalPoolImpl : boost : : noncopyable
2019-01-14 10:59:58 +00:00
{
public :
2022-09-01 03:56:10 +00:00
ThreadFromGlobalPoolImpl ( ) = default ;
2019-01-14 10:59:58 +00:00
2019-01-14 19:22:09 +00:00
template < typename Function , typename . . . Args >
2022-09-01 03:56:10 +00:00
explicit ThreadFromGlobalPoolImpl ( Function & & func , Args & & . . . args )
2022-07-13 12:49:13 +00:00
: state ( std : : make_shared < State > ( ) )
2019-01-14 10:59:58 +00:00
{
2022-07-13 07:03:48 +00:00
/// NOTE:
/// - If this will throw an exception, the destructor won't be called
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
2019-01-14 19:22:09 +00:00
GlobalThreadPool : : instance ( ) . scheduleOrThrow ( [
2019-03-06 16:46:05 +00:00
state = state ,
2019-01-14 19:22:09 +00:00
func = std : : forward < Function > ( func ) ,
2020-09-30 08:25:22 +00:00
args = std : : make_tuple ( std : : forward < Args > ( args ) . . . ) ] ( ) mutable /// mutable is needed to destroy capture
2019-01-14 19:22:09 +00:00
{
2022-10-22 13:28:22 +00:00
SCOPE_EXIT (
state - > thread_id = std : : thread : : id ( ) ;
state - > event . set ( ) ;
) ;
2020-09-29 19:11:40 +00:00
2022-07-13 12:49:13 +00:00
state - > thread_id = std : : this_thread : : get_id ( ) ;
2021-08-31 20:16:03 +00:00
2020-09-30 08:25:22 +00:00
/// This moves are needed to destroy function and arguments before exit.
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
2020-09-29 20:43:02 +00:00
auto function = std : : move ( func ) ;
2020-09-29 19:11:40 +00:00
auto arguments = std : : move ( args ) ;
/// Thread status holds raw pointer on query context, thus it always must be destroyed
/// before sending signal that permits to join this thread.
DB : : ThreadStatus thread_status ;
2020-09-29 20:43:02 +00:00
std : : apply ( function , arguments ) ;
2022-09-01 03:56:10 +00:00
} ,
0 , // default priority
0 , // default wait_microseconds
propagate_opentelemetry_context
) ;
2019-01-14 10:59:58 +00:00
}
2022-09-01 03:56:10 +00:00
ThreadFromGlobalPoolImpl ( ThreadFromGlobalPoolImpl & & rhs ) noexcept
2019-01-14 10:59:58 +00:00
{
* this = std : : move ( rhs ) ;
}
2022-09-01 03:56:10 +00:00
ThreadFromGlobalPoolImpl & operator = ( ThreadFromGlobalPoolImpl & & rhs ) noexcept
2019-01-14 10:59:58 +00:00
{
2022-07-13 07:01:31 +00:00
if ( initialized ( ) )
2020-12-30 05:31:45 +00:00
abort ( ) ;
2019-03-06 16:46:05 +00:00
state = std : : move ( rhs . state ) ;
2019-01-14 10:59:58 +00:00
return * this ;
}
2022-09-01 03:56:10 +00:00
~ ThreadFromGlobalPoolImpl ( )
2019-01-14 10:59:58 +00:00
{
2022-07-13 07:01:31 +00:00
if ( initialized ( ) )
2020-12-30 05:31:45 +00:00
abort ( ) ;
2019-01-14 10:59:58 +00:00
}
void join ( )
{
2022-07-13 07:01:31 +00:00
if ( ! initialized ( ) )
2020-12-30 05:31:45 +00:00
abort ( ) ;
2019-03-06 16:46:05 +00:00
2022-07-13 12:49:13 +00:00
state - > event . wait ( ) ;
2019-03-06 16:46:05 +00:00
state . reset ( ) ;
2019-01-14 10:59:58 +00:00
}
2019-01-14 19:22:09 +00:00
2019-02-25 15:45:07 +00:00
void detach ( )
{
2022-07-13 07:01:31 +00:00
if ( ! initialized ( ) )
2020-12-30 05:31:45 +00:00
abort ( ) ;
2019-03-06 16:46:05 +00:00
state . reset ( ) ;
2019-02-25 15:45:07 +00:00
}
2019-01-14 19:22:09 +00:00
bool joinable ( ) const
{
2021-08-31 20:16:03 +00:00
if ( ! state )
return false ;
/// Thread cannot join itself.
2022-07-13 12:49:13 +00:00
if ( state - > thread_id = = std : : this_thread : : get_id ( ) )
2021-08-31 20:16:03 +00:00
return false ;
return true ;
2019-01-14 19:22:09 +00:00
}
2022-08-30 04:26:23 +00:00
protected :
2022-07-13 12:49:13 +00:00
struct State
2022-07-13 08:03:19 +00:00
{
/// Should be atomic() because of possible concurrent access between
/// assignment and joinable() check.
2022-07-13 12:49:13 +00:00
std : : atomic < std : : thread : : id > thread_id ;
/// The state used in this object and inside the thread job.
Poco : : Event event ;
2022-07-13 08:03:19 +00:00
} ;
2022-07-13 12:49:13 +00:00
std : : shared_ptr < State > state ;
2022-07-13 07:01:31 +00:00
/// Internally initialized() should be used over joinable(),
/// since it is enough to know that the thread is initialized,
/// and ignore that fact that thread cannot join itself.
bool initialized ( ) const
{
return static_cast < bool > ( state ) ;
}
2019-01-14 10:59:58 +00:00
} ;
2022-09-12 14:15:30 +00:00
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
2022-09-12 16:41:05 +00:00
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
2022-09-12 14:15:30 +00:00
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
2022-09-12 16:06:17 +00:00
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl < false > ;
2022-09-12 14:15:30 +00:00
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl < true > ;
2022-09-01 03:56:10 +00:00
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
2022-08-30 04:26:23 +00:00
///
2022-09-01 03:56:10 +00:00
/// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker.
/// Because ThreadFromGlobalPool schedules a job upon GlobalThreadPool, this means there will be two workers to schedule a job in 'ThreadPool',
/// one is at GlobalThreadPool level, the other is at ThreadPool level, so tracing context will be initialized on the same thread twice.
2022-08-30 04:26:23 +00:00
///
2022-09-01 03:56:10 +00:00
/// Once the worker on ThreadPool gains the control of execution, it won't return until it's shutdown,
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
2022-08-30 04:26:23 +00:00
///
2022-09-12 14:15:30 +00:00
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
2022-09-01 03:56:10 +00:00
///
2022-09-12 16:06:17 +00:00
using ThreadPool = ThreadPoolImpl < ThreadFromGlobalPoolNoTracingContextPropagation > ;