ClickHouse/src/Common/ThreadPool.cpp

385 lines
12 KiB
C++
Raw Normal View History

#include <Common/ThreadPool.h>
2022-04-10 23:03:24 +00:00
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/OpenTelemetryTraceContext.h>
2020-06-22 19:04:12 +00:00
#include <cassert>
#include <iostream>
#include <type_traits>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SCHEDULE_TASK;
extern const int LOGICAL_ERROR;
}
}
2019-08-01 20:09:38 +00:00
namespace CurrentMetrics
{
extern const Metric GlobalThread;
extern const Metric GlobalThreadActive;
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl()
: ThreadPoolImpl(getNumberOfPhysicalCPUCores())
{
}
template <typename Thread>
2019-08-03 11:02:40 +00:00
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
: ThreadPoolImpl(max_threads_, max_threads_, max_threads_)
{
}
template <typename Thread>
Do not shutdown global thread pool on exception Otherwise GlobalThreadPool can be terminated (for example due to an exception from the ParallelInputsHandler::onFinish/onFinishThread, from ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread, since writeToTemporaryFile() can definitelly throw) and the server will not accept new connections (or/and execute queries) anymore. Here is possible stacktrace (it is a bit inaccurate, due to optimizations I guess, and it had been obtained with the DB::tryLogCurrentException() in the catch block of the ThreadPoolImpl::worker()): 2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below): 1. Common/Exception.cpp:35: DB::Exception::Exception(...) ... 6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0) 7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...) 8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...) 9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...) 10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...) Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732 (Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_)
: max_threads(max_threads_)
, max_free_threads(max_free_threads_)
, queue_size(queue_size_)
, shutdown_on_exception(shutdown_on_exception_)
{
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
std::lock_guard lock(mutex);
max_threads = value;
2022-02-04 00:54:33 +00:00
/// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total.
2022-02-03 23:29:46 +00:00
queue_size = std::max(queue_size, max_threads);
jobs.reserve(queue_size);
}
template <typename Thread>
size_t ThreadPoolImpl<Thread>::getMaxThreads() const
{
std::lock_guard lock(mutex);
return max_threads;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
max_free_threads = value;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
/// Reserve memory to get rid of allocations
2021-09-07 12:45:39 +00:00
jobs.reserve(queue_size);
}
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds, bool enable_tracing_context_propagation)
{
auto on_error = [&](const std::string & reason)
{
if constexpr (std::is_same_v<ReturnType, void>)
{
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK,
"Cannot schedule a task: {} (threads={}, jobs={})", reason,
threads.size(), scheduled_jobs);
}
else
return false;
};
{
std::unique_lock lock(mutex);
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
}
else
job_finished.wait(lock, pred);
if (shutdown)
return on_error("shutdown");
2021-05-25 11:54:47 +00:00
/// We must not to allocate any memory after we emplaced a job in a queue.
2021-05-25 12:58:04 +00:00
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.
2021-05-24 16:24:03 +00:00
/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{
2021-05-24 16:24:03 +00:00
try
{
threads.emplace_front();
}
catch (...)
{
/// Most likely this is a std::bad_alloc exception
return on_error("cannot allocate thread slot");
2021-05-24 16:24:03 +00:00
}
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
}
catch (...)
{
threads.pop_front();
return on_error("cannot allocate thread");
}
}
2021-05-24 16:24:03 +00:00
if (enable_tracing_context_propagation)
{
/// Tracing context on this thread is used as parent context for the sub-thread that runs the job
const auto &current_thread_context = DB::OpenTelemetry::CurrentContext();
jobs.emplace(std::move(job), priority, current_thread_context);
}
else
{
DB::OpenTelemetry::TracingContextOnThread empty;
jobs.emplace(std::move(job), priority, empty);
}
2021-05-24 16:24:03 +00:00
++scheduled_jobs;
new_job_or_shutdown.notify_one();
}
2021-05-24 16:24:03 +00:00
return static_cast<ReturnType>(true);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds, bool enable_tracing_context_propagation)
{
scheduleImpl<void>(std::move(job), priority, wait_microseconds, enable_tracing_context_propagation);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
{
std::unique_lock lock(mutex);
2021-05-25 11:54:47 +00:00
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
}
}
template <typename Thread>
ThreadPoolImpl<Thread>::~ThreadPoolImpl()
{
2021-08-19 08:22:57 +00:00
/// Note: should not use logger from here,
/// because it can be an instance of GlobalThreadPool that is a global variable
/// and the destruction order of global variables is unspecified.
finalize();
}
template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::lock_guard lock(mutex);
shutdown = true;
}
new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
threads.clear();
}
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::lock_guard lock(mutex);
return scheduled_jobs;
}
2021-04-28 18:26:12 +00:00
template <typename Thread>
bool ThreadPoolImpl<Thread>::finished() const
{
std::lock_guard lock(mutex);
2021-04-28 18:26:12 +00:00
return shutdown;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
2021-01-12 14:34:50 +00:00
DENY_ALLOCATIONS_IN_SCOPE;
2019-08-01 20:09:38 +00:00
CurrentMetrics::Increment metric_all_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread);
while (true)
{
2022-04-10 23:03:24 +00:00
/// This is inside the loop to also reset previous thread names set inside the jobs.
2022-04-13 12:18:49 +00:00
setThreadName("ThreadPool");
2022-04-10 23:03:24 +00:00
Job job;
bool need_shutdown = false;
2022-08-24 07:24:21 +00:00
/// A copy of parent trace context
DB::OpenTelemetry::TracingContextOnThread parent_thead_trace_context;
2022-08-24 07:24:21 +00:00
{
std::unique_lock lock(mutex);
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
need_shutdown = shutdown;
if (!jobs.empty())
2021-09-06 11:37:51 +00:00
{
2021-09-06 18:33:31 +00:00
/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
job = std::move(const_cast<Job &>(jobs.top().job));
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
2021-09-06 11:37:51 +00:00
jobs.pop();
}
else
2021-09-06 11:37:51 +00:00
{
/// shutdown is true, simply finish the thread.
return;
}
}
if (!need_shutdown)
{
2022-08-24 07:24:21 +00:00
/// Set up tracing context for this thread by its parent context
DB::OpenTelemetry::TracingContextHolder thread_trace_context("ThreadPool::worker()", parent_thead_trace_context);
try
{
2021-01-12 14:34:50 +00:00
ALLOW_ALLOCATIONS_IN_SCOPE;
2019-08-01 20:09:38 +00:00
CurrentMetrics::Increment metric_active_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
job();
if (thread_trace_context.root_span.isTraceEnabled())
{
2022-08-24 07:24:21 +00:00
/// Use the thread name as operation name so that the tracing log will be more clear.
/// The thread name is usually set in the jobs, we can only get the name after the job finishes
std::string thread_name = getThreadName();
if (!thread_name.empty())
thread_trace_context.root_span.operation_name = thread_name;
}
2020-10-27 11:04:03 +00:00
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
2020-09-21 17:49:15 +00:00
job = {};
parent_thead_trace_context.reset();
}
catch (...)
{
thread_trace_context.root_span.addAttribute(std::current_exception());
2020-10-27 11:04:03 +00:00
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
parent_thead_trace_context.reset();
{
std::lock_guard lock(mutex);
if (!first_exception)
2020-03-18 02:02:24 +00:00
first_exception = std::current_exception(); // NOLINT
Do not shutdown global thread pool on exception Otherwise GlobalThreadPool can be terminated (for example due to an exception from the ParallelInputsHandler::onFinish/onFinishThread, from ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread, since writeToTemporaryFile() can definitelly throw) and the server will not accept new connections (or/and execute queries) anymore. Here is possible stacktrace (it is a bit inaccurate, due to optimizations I guess, and it had been obtained with the DB::tryLogCurrentException() in the catch block of the ThreadPoolImpl::worker()): 2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below): 1. Common/Exception.cpp:35: DB::Exception::Exception(...) ... 6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0) 7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...) 8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...) 9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...) 10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...) Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732 (Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
if (shutdown_on_exception)
shutdown = true;
--scheduled_jobs;
}
Do not shutdown global thread pool on exception Otherwise GlobalThreadPool can be terminated (for example due to an exception from the ParallelInputsHandler::onFinish/onFinishThread, from ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread, since writeToTemporaryFile() can definitelly throw) and the server will not accept new connections (or/and execute queries) anymore. Here is possible stacktrace (it is a bit inaccurate, due to optimizations I guess, and it had been obtained with the DB::tryLogCurrentException() in the catch block of the ThreadPoolImpl::worker()): 2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below): 1. Common/Exception.cpp:35: DB::Exception::Exception(...) ... 6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0) 7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...) 8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...) 9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...) 10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...) Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732 (Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;
}
}
{
std::lock_guard lock(mutex);
--scheduled_jobs;
if (threads.size() > scheduled_jobs + max_free_threads)
{
thread_it->detach();
threads.erase(thread_it);
job_finished.notify_all();
return;
}
}
job_finished.notify_all();
}
}
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<Thread4ThreadPool>;
2020-06-22 19:04:12 +00:00
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
if (the_instance)
{
2020-09-16 08:59:58 +00:00
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"The global thread pool is initialized twice");
}
the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/));
2020-06-22 19:04:12 +00:00
}
2020-06-22 19:04:12 +00:00
GlobalThreadPool & GlobalThreadPool::instance()
{
if (!the_instance)
{
2020-06-23 17:31:46 +00:00
// Allow implicit initialization. This is needed for old code that is
// impractical to redo now, especially Arcadia users and unit tests.
initialize();
2020-06-22 19:04:12 +00:00
}
2020-06-22 19:04:12 +00:00
return *the_instance;
}