Do not shutdown global thread pool on exception

Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.

Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):

    2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):

    1.  Common/Exception.cpp:35: DB::Exception::Exception(...)
    ...
    6.  Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
    7.  Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
    8.  Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
    9.  include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
    10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)

Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
This commit is contained in:
Azat Khuzhin 2020-02-17 23:15:29 +03:00
parent 6969191c9f
commit a15b2daf6d
2 changed files with 19 additions and 5 deletions

View File

@ -28,8 +28,11 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
} }
template <typename Thread> template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_) ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_)
: max_threads(max_threads_), max_free_threads(max_free_threads_), queue_size(queue_size_) : max_threads(max_threads_)
, max_free_threads(max_free_threads_)
, queue_size(queue_size_)
, shutdown_on_exception(shutdown_on_exception_)
{ {
} }
@ -226,9 +229,19 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
if (!first_exception) if (!first_exception)
first_exception = std::current_exception(); first_exception = std::current_exception();
shutdown = true; if (shutdown_on_exception)
shutdown = true;
--scheduled_jobs; --scheduled_jobs;
} }
DB::tryLogCurrentException("ThreadPool",
std::string("Exception in the ThreadPool(") +
std::to_string(max_threads) + ", " +
std::to_string(max_free_threads) + ", " +
std::to_string(queue_size) + ", " +
std::to_string(shutdown_on_exception) +
").");
job_finished.notify_all(); job_finished.notify_all();
new_job_or_shutdown.notify_all(); new_job_or_shutdown.notify_all();
return; return;

View File

@ -33,7 +33,7 @@ public:
explicit ThreadPoolImpl(size_t max_threads_); explicit ThreadPoolImpl(size_t max_threads_);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited. /// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_); ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_ = true);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown. /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If any thread was throw an exception, first exception will be rethrown from this method, /// If any thread was throw an exception, first exception will be rethrown from this method,
@ -79,6 +79,7 @@ private:
size_t scheduled_jobs = 0; size_t scheduled_jobs = 0;
bool shutdown = false; bool shutdown = false;
const bool shutdown_on_exception = true;
struct JobWithPriority struct JobWithPriority
{ {
@ -128,7 +129,7 @@ using FreeThreadPool = ThreadPoolImpl<std::thread>;
class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
{ {
public: public:
GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000) {} GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000, false) {}
static GlobalThreadPool & instance(); static GlobalThreadPool & instance();
}; };