#include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int CANNOT_SCHEDULE_TASK; extern const int LOGICAL_ERROR; } } namespace CurrentMetrics { extern const Metric GlobalThread; extern const Metric GlobalThreadActive; extern const Metric LocalThread; extern const Metric LocalThreadActive; } template ThreadPoolImpl::ThreadPoolImpl() : ThreadPoolImpl(getNumberOfPhysicalCPUCores()) { } template ThreadPoolImpl::ThreadPoolImpl(size_t max_threads_) : ThreadPoolImpl(max_threads_, max_threads_, max_threads_) { } template ThreadPoolImpl::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_) : max_threads(max_threads_) , max_free_threads(max_free_threads_) , queue_size(queue_size_) , shutdown_on_exception(shutdown_on_exception_) { } template void ThreadPoolImpl::setMaxThreads(size_t value) { std::lock_guard lock(mutex); max_threads = value; /// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total. queue_size = std::max(queue_size, max_threads); jobs.reserve(queue_size); } template size_t ThreadPoolImpl::getMaxThreads() const { std::lock_guard lock(mutex); return max_threads; } template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { std::lock_guard lock(mutex); max_free_threads = value; } template void ThreadPoolImpl::setQueueSize(size_t value) { std::lock_guard lock(mutex); queue_size = value; /// Reserve memory to get rid of allocations jobs.reserve(queue_size); } template template ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::optional wait_microseconds) { auto on_error = [&](const std::string & reason) { if constexpr (std::is_same_v) { if (first_exception) { std::exception_ptr exception; std::swap(exception, first_exception); std::rethrow_exception(exception); } throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "Cannot schedule a task: {} (threads={}, jobs={})", reason, threads.size(), scheduled_jobs); } else return false; }; { std::unique_lock lock(mutex); auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds)); } else job_finished.wait(lock, pred); if (shutdown) return on_error("shutdown"); /// We must not to allocate any memory after we emplaced a job in a queue. /// Because if an exception would be thrown, we won't notify a thread about job occurrence. /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { try { threads.emplace_front(); } catch (...) { /// Most likely this is a std::bad_alloc exception return on_error("cannot allocate thread slot"); } try { threads.front() = Thread([this, it = threads.begin()] { worker(it); }); } catch (...) { threads.pop_front(); return on_error("cannot allocate thread"); } } // this scheduleImpl is called in the parent thread, // the tracing context on this thread is used as parent context for the sub-thread that runs the job auto& current_thread_context = DB::OpenTelemetryThreadTraceContext::current(); jobs.emplace(std::move(job), priority, current_thread_context); ++scheduled_jobs; new_job_or_shutdown.notify_one(); } return static_cast(true); } template void ThreadPoolImpl::scheduleOrThrowOnError(Job job, int priority) { scheduleImpl(std::move(job), priority, std::nullopt); } template bool ThreadPoolImpl::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept { return scheduleImpl(std::move(job), priority, wait_microseconds); } template void ThreadPoolImpl::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds) { scheduleImpl(std::move(job), priority, wait_microseconds); } template void ThreadPoolImpl::wait() { { std::unique_lock lock(mutex); /// Signal here just in case. /// If threads are waiting on condition variables, but there are some jobs in the queue /// then it will prevent us from deadlock. new_job_or_shutdown.notify_all(); job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception) { std::exception_ptr exception; std::swap(exception, first_exception); std::rethrow_exception(exception); } } } template ThreadPoolImpl::~ThreadPoolImpl() { /// Note: should not use logger from here, /// because it can be an instance of GlobalThreadPool that is a global variable /// and the destruction order of global variables is unspecified. finalize(); } template void ThreadPoolImpl::finalize() { { std::lock_guard lock(mutex); shutdown = true; } new_job_or_shutdown.notify_all(); for (auto & thread : threads) thread.join(); threads.clear(); } template size_t ThreadPoolImpl::active() const { std::lock_guard lock(mutex); return scheduled_jobs; } template bool ThreadPoolImpl::finished() const { std::lock_guard lock(mutex); return shutdown; } template void ThreadPoolImpl::worker(typename std::list::iterator thread_it) { DENY_ALLOCATIONS_IN_SCOPE; CurrentMetrics::Increment metric_all_threads( std::is_same_v ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread); while (true) { /// This is inside the loop to also reset previous thread names set inside the jobs. setThreadName("ThreadPool"); Job job; DB::OpenTelemetryThreadTraceContext parent_thead_trace_context; // A copy of parent trace context bool need_shutdown = false; { std::unique_lock lock(mutex); new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); }); need_shutdown = shutdown; if (!jobs.empty()) { /// boost::priority_queue does not provide interface for getting non-const reference to an element /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job. job = std::move(const_cast(jobs.top().job)); parent_thead_trace_context = std::move(const_cast(jobs.top().thread_trace_context)); jobs.pop(); } else { /// shutdown is true, simply finish the thread. return; } } if (!need_shutdown) { // set up tracing context for this thread by its parent context DB::OpenTelemetryThreadTraceContextScope thread_trace_context("ThreadPool::worker()" , parent_thead_trace_context); try { ALLOW_ALLOCATIONS_IN_SCOPE; CurrentMetrics::Increment metric_active_threads( std::is_same_v ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive); job(); if (thread_trace_context.root_span.isTraceEnabled()) { // Use the thread name as operation name so that the tracing log will be more clear. // the thread name is usually set in the jobs, we can only get the name after the job finishes std::string thread_name = getThreadName(); if (!thread_name.empty()) thread_trace_context.root_span.operation_name = thread_name; } /// job should be reset before decrementing scheduled_jobs to /// ensure that the Job destroyed before wait() returns. job = {}; parent_thead_trace_context.reset(); } catch (...) { thread_trace_context.root_span.addAttribute(std::current_exception()); /// job should be reset before decrementing scheduled_jobs to /// ensure that the Job destroyed before wait() returns. job = {}; parent_thead_trace_context.reset(); { std::lock_guard lock(mutex); if (!first_exception) first_exception = std::current_exception(); // NOLINT if (shutdown_on_exception) shutdown = true; --scheduled_jobs; } job_finished.notify_all(); new_job_or_shutdown.notify_all(); return; } } { std::lock_guard lock(mutex); --scheduled_jobs; if (threads.size() > scheduled_jobs + max_free_threads) { thread_it->detach(); threads.erase(thread_it); job_finished.notify_all(); return; } } job_finished.notify_all(); } } template class ThreadPoolImpl; template class ThreadPoolImpl; std::unique_ptr GlobalThreadPool::the_instance; void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size) { if (the_instance) { throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The global thread pool is initialized twice"); } the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/)); } GlobalThreadPool & GlobalThreadPool::instance() { if (!the_instance) { // Allow implicit initialization. This is needed for old code that is // impractical to redo now, especially Arcadia users and unit tests. initialize(); } return *the_instance; }