ClickHouse/dbms/src/Common/ThreadPool.cpp

114 lines
1.9 KiB
C++
Raw Normal View History

2016-08-13 05:29:53 +00:00
#include <DB/Common/ThreadPool.h>
ThreadPool::ThreadPool(size_t m_size)
: m_size(m_size)
{
threads.reserve(m_size);
for (size_t i = 0; i < m_size; ++i)
threads.emplace_back([this] { worker(); });
}
void ThreadPool::schedule(Job job)
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
if (shutdown)
return;
jobs.push(std::move(job));
++active_jobs;
}
has_new_job_or_shutdown.notify_one();
}
void ThreadPool::wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
2016-09-02 02:58:59 +00:00
has_free_thread.wait(lock, [this] { return active_jobs == 0; });
2016-08-13 05:29:53 +00:00
2016-09-02 02:58:59 +00:00
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
2016-08-13 05:29:53 +00:00
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
has_new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
}
size_t ThreadPool::active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
void ThreadPool::worker()
{
while (true)
{
Job job;
2016-09-02 02:58:59 +00:00
bool need_shutdown = false;
2016-08-13 05:29:53 +00:00
{
std::unique_lock<std::mutex> lock(mutex);
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
2016-09-02 02:58:59 +00:00
need_shutdown = shutdown;
2016-08-13 05:29:53 +00:00
2016-09-02 02:58:59 +00:00
if (!jobs.empty())
2016-08-13 05:29:53 +00:00
{
job = std::move(jobs.front());
jobs.pop();
}
else
{
return;
}
}
2016-09-02 02:58:59 +00:00
if (!need_shutdown)
2016-08-13 05:29:53 +00:00
{
2016-09-02 02:58:59 +00:00
try
2016-08-13 05:29:53 +00:00
{
2016-09-02 02:58:59 +00:00
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::current_exception();
shutdown = true;
--active_jobs;
}
has_free_thread.notify_one();
has_new_job_or_shutdown.notify_all();
return;
2016-08-13 05:29:53 +00:00
}
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
has_free_thread.notify_one();
}
}