Merge branch 'ParallelInputsProcessor-GlobalThreadPool-shutdown-fix' of https://github.com/azat/ClickHouse into azat-ParallelInputsProcessor-GlobalThreadPool-shutdown-fix

This commit is contained in:
Alexey Milovidov 2020-03-19 21:29:17 +03:00
commit 8d9aba4fc3
3 changed files with 34 additions and 8 deletions

View File

@ -28,8 +28,11 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_)
: max_threads(max_threads_), max_free_threads(max_free_threads_), queue_size(queue_size_)
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_)
: max_threads(max_threads_)
, max_free_threads(max_free_threads_)
, queue_size(queue_size_)
, shutdown_on_exception(shutdown_on_exception_)
{
}
@ -226,9 +229,19 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::unique_lock lock(mutex);
if (!first_exception)
first_exception = std::current_exception(); // NOLINT
shutdown = true;
if (shutdown_on_exception)
shutdown = true;
--scheduled_jobs;
}
DB::tryLogCurrentException("ThreadPool",
std::string("Exception in ThreadPool(") +
"max_threads: " + std::to_string(max_threads)
+ ", max_free_threads: " + std::to_string(max_free_threads)
+ ", queue_size: " + std::to_string(queue_size)
+ ", shutdown_on_exception: " + std::to_string(shutdown_on_exception)
+ ").");
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;

View File

@ -33,7 +33,7 @@ public:
explicit ThreadPoolImpl(size_t max_threads_);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_ = true);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If any thread was throw an exception, first exception will be rethrown from this method,
@ -79,6 +79,7 @@ private:
size_t scheduled_jobs = 0;
bool shutdown = false;
const bool shutdown_on_exception = true;
struct JobWithPriority
{
@ -128,7 +129,7 @@ using FreeThreadPool = ThreadPoolImpl<std::thread>;
class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
{
public:
GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000) {}
GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000, false) {}
static GlobalThreadPool & instance();
};

View File

@ -206,6 +206,8 @@ private:
}
loop(thread_num);
handler.onFinishThread(thread_num);
}
catch (...)
{
@ -217,8 +219,6 @@ private:
handler.onException(exception, thread_num);
}
handler.onFinishThread(thread_num);
/// The last thread on the output indicates that there is no more data.
if (0 == --active_threads)
{
@ -242,7 +242,19 @@ private:
}
}
handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
try
{
handler.onFinish();
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
}