diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 18d952c9f73..b9bd6c75708 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -113,12 +113,22 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti if (shutdown) return on_error(); - jobs.emplace(std::move(job), priority); - ++scheduled_jobs; + /// We must not to allocate any memory after we emplaced a job in a queue. + /// Because if an exception would be thrown, we won't notify a thread about job occurrence. - if (threads.size() < std::min(max_threads, scheduled_jobs)) + /// Check if there are enough threads to process job. + if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - threads.emplace_front(); + try + { + threads.emplace_front(); + } + catch (...) + { + /// Most likely this is a std::bad_alloc exception + return on_error(); + } + try { threads.front() = Thread([this, it = threads.begin()] { worker(it); }); @@ -126,19 +136,15 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti catch (...) { threads.pop_front(); - - /// Remove the job and return error to caller. - /// Note that if we have allocated at least one thread, we may continue - /// (one thread is enough to process all jobs). - /// But this condition indicate an error nevertheless and better to refuse. - - jobs.pop(); - --scheduled_jobs; return on_error(); } } + + jobs.emplace(std::move(job), priority); + ++scheduled_jobs; + new_job_or_shutdown.notify_one(); } - new_job_or_shutdown.notify_one(); + return ReturnType(true); } @@ -165,6 +171,10 @@ void ThreadPoolImpl::wait() { { std::unique_lock lock(mutex); + /// Signal here just in case. + /// If threads are waiting on condition variables, but there are some jobs in the queue + /// then it will prevent us from deadlock. + new_job_or_shutdown.notify_all(); job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception)