Merge pull request #24457 from nikitamikhaylov/thread-pool-deadlock

Try to fix deadlock in ThreadPool
This commit is contained in:
Nikita Mikhaylov 2021-05-27 02:51:50 +03:00 committed by GitHub
commit 0d3462afc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -113,12 +113,22 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
if (shutdown) if (shutdown)
return on_error(); return on_error();
jobs.emplace(std::move(job), priority); /// We must not to allocate any memory after we emplaced a job in a queue.
++scheduled_jobs; /// Because if an exception would be thrown, we won't notify a thread about job occurrence.
if (threads.size() < std::min(max_threads, scheduled_jobs)) /// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{ {
threads.emplace_front(); try
{
threads.emplace_front();
}
catch (...)
{
/// Most likely this is a std::bad_alloc exception
return on_error();
}
try try
{ {
threads.front() = Thread([this, it = threads.begin()] { worker(it); }); threads.front() = Thread([this, it = threads.begin()] { worker(it); });
@ -126,19 +136,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
catch (...) catch (...)
{ {
threads.pop_front(); threads.pop_front();
/// Remove the job and return error to caller.
/// Note that if we have allocated at least one thread, we may continue
/// (one thread is enough to process all jobs).
/// But this condition indicate an error nevertheless and better to refuse.
jobs.pop();
--scheduled_jobs;
return on_error(); return on_error();
} }
} }
jobs.emplace(std::move(job), priority);
++scheduled_jobs;
new_job_or_shutdown.notify_one();
} }
new_job_or_shutdown.notify_one();
return ReturnType(true); return ReturnType(true);
} }
@ -165,6 +171,10 @@ void ThreadPoolImpl<Thread>::wait()
{ {
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();
job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
if (first_exception) if (first_exception)