mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Attempt to implemnt global thread pool #4018
This commit is contained in:
parent
e4fce8291e
commit
510b15540c
@ -8,12 +8,10 @@
|
||||
#include <functional>
|
||||
#include <Poco/File.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <common/ThreadPool.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
|
||||
class ThreadPool;
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -2,12 +2,12 @@
|
||||
|
||||
#include <Interpreters/IInterpreter.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <common/ThreadPool.h>
|
||||
|
||||
|
||||
class ThreadPool;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class ASTCreateQuery;
|
||||
class ASTExpressionList;
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <functional>
|
||||
#include <queue>
|
||||
#include <vector>
|
||||
#include <ext/singleton.h>
|
||||
|
||||
|
||||
/** Very simple thread pool similar to boost::threadpool.
|
||||
@ -14,17 +15,22 @@
|
||||
* - catches exceptions and rethrows on wait.
|
||||
*/
|
||||
|
||||
class ThreadPool
|
||||
template <typename Thread>
|
||||
class ThreadPoolImpl
|
||||
{
|
||||
public:
|
||||
using Job = std::function<void()>;
|
||||
|
||||
/// Size is constant, all threads are created immediately.
|
||||
explicit ThreadPool(size_t m_size);
|
||||
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
|
||||
explicit ThreadPoolImpl(size_t num_threads);
|
||||
|
||||
/// Add new job. Locks until free thread in pool become available or exception in one of threads was thrown.
|
||||
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than num_threads. Zero means unlimited.
|
||||
ThreadPoolImpl(size_t num_threads, size_t queue_size);
|
||||
|
||||
/// Add new job. Locks until number of active jobs is less than maximum or exception in one of threads was thrown.
|
||||
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
|
||||
void schedule(Job job);
|
||||
/// Priority: greater is higher.
|
||||
void schedule(Job job, int priority = 0);
|
||||
|
||||
/// Wait for all currently active jobs to be done.
|
||||
/// You may call schedule and wait many times in arbitary order.
|
||||
@ -34,24 +40,40 @@ public:
|
||||
|
||||
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
|
||||
/// You should not destroy object while calling schedule or wait methods from another threads.
|
||||
~ThreadPool();
|
||||
~ThreadPoolImpl();
|
||||
|
||||
size_t size() const { return m_size; }
|
||||
size_t size() const { return num_threads; }
|
||||
|
||||
/// Returns number of active jobs.
|
||||
/// Returns number of running and scheduled jobs.
|
||||
size_t active() const;
|
||||
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable has_free_thread;
|
||||
std::condition_variable has_new_job_or_shutdown;
|
||||
std::condition_variable job_finished;
|
||||
std::condition_variable new_job_or_shutdown;
|
||||
|
||||
const size_t num_threads;
|
||||
const size_t queue_size;
|
||||
|
||||
const size_t m_size;
|
||||
size_t active_jobs = 0;
|
||||
bool shutdown = false;
|
||||
|
||||
std::queue<Job> jobs;
|
||||
std::vector<std::thread> threads;
|
||||
struct JobWithPriority
|
||||
{
|
||||
Job job;
|
||||
int priority;
|
||||
|
||||
JobWithPriority(Job job, int priority)
|
||||
: job(job), priority(priority) {}
|
||||
|
||||
bool operator< (const JobWithPriority & rhs) const
|
||||
{
|
||||
return priority < rhs.priority;
|
||||
}
|
||||
};
|
||||
|
||||
std::priority_queue<JobWithPriority> jobs;
|
||||
std::vector<Thread> threads;
|
||||
std::exception_ptr first_exception;
|
||||
|
||||
|
||||
@ -61,6 +83,31 @@ private:
|
||||
};
|
||||
|
||||
|
||||
using FreeThreadPool = ThreadPoolImpl<std::thread>;
|
||||
|
||||
class GlobalThreadPool : public FreeThreadPool, public ext::singleton<GlobalThreadPool>
|
||||
{
|
||||
public:
|
||||
GlobalThreadPool() : FreeThreadPool(10000) {} /// TODO: global blocking limit may lead to deadlocks.
|
||||
};
|
||||
|
||||
class ThreadFromGlobalPool
|
||||
{
|
||||
public:
|
||||
ThreadFromGlobalPool(std::function<void()> func)
|
||||
{
|
||||
GlobalThreadPool::instance().schedule(func);
|
||||
}
|
||||
|
||||
void join()
|
||||
{
|
||||
/// noop, the std::thread will continue to run inside global pool.
|
||||
}
|
||||
};
|
||||
|
||||
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
|
||||
|
||||
|
||||
/// Allows to save first catched exception in jobs and postpone its rethrow.
|
||||
class ExceptionHandler
|
||||
{
|
||||
|
@ -2,14 +2,21 @@
|
||||
#include <iostream>
|
||||
|
||||
|
||||
ThreadPool::ThreadPool(size_t m_size)
|
||||
: m_size(m_size)
|
||||
template <typename Thread>
|
||||
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads)
|
||||
: ThreadPoolImpl(num_threads, num_threads)
|
||||
{
|
||||
threads.reserve(m_size);
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads, size_t queue_size)
|
||||
: num_threads(num_threads), queue_size(queue_size)
|
||||
{
|
||||
threads.reserve(num_threads);
|
||||
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < m_size; ++i)
|
||||
for (size_t i = 0; i < num_threads; ++i)
|
||||
threads.emplace_back([this] { worker(); });
|
||||
}
|
||||
catch (...)
|
||||
@ -19,25 +26,30 @@ ThreadPool::ThreadPool(size_t m_size)
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadPool::schedule(Job job)
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
|
||||
job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; });
|
||||
if (shutdown)
|
||||
return;
|
||||
|
||||
jobs.push(std::move(job));
|
||||
jobs.emplace(std::move(job), priority);
|
||||
++active_jobs;
|
||||
|
||||
if (threads.size() < std::min(num_threads, active_jobs))
|
||||
threads.emplace_back([this] { worker(); });
|
||||
}
|
||||
has_new_job_or_shutdown.notify_one();
|
||||
new_job_or_shutdown.notify_one();
|
||||
}
|
||||
|
||||
void ThreadPool::wait()
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::wait()
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
has_free_thread.wait(lock, [this] { return active_jobs == 0; });
|
||||
job_finished.wait(lock, [this] { return active_jobs == 0; });
|
||||
|
||||
if (first_exception)
|
||||
{
|
||||
@ -48,19 +60,21 @@ void ThreadPool::wait()
|
||||
}
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool()
|
||||
template <typename Thread>
|
||||
ThreadPoolImpl<Thread>::~ThreadPoolImpl()
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
|
||||
void ThreadPool::finalize()
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::finalize()
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
shutdown = true;
|
||||
}
|
||||
|
||||
has_new_job_or_shutdown.notify_all();
|
||||
new_job_or_shutdown.notify_all();
|
||||
|
||||
for (auto & thread : threads)
|
||||
thread.join();
|
||||
@ -68,14 +82,15 @@ void ThreadPool::finalize()
|
||||
threads.clear();
|
||||
}
|
||||
|
||||
size_t ThreadPool::active() const
|
||||
template <typename Thread>
|
||||
size_t ThreadPoolImpl<Thread>::active() const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
return active_jobs;
|
||||
}
|
||||
|
||||
|
||||
void ThreadPool::worker()
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::worker()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
@ -84,12 +99,12 @@ void ThreadPool::worker()
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
|
||||
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
|
||||
need_shutdown = shutdown;
|
||||
|
||||
if (!jobs.empty())
|
||||
{
|
||||
job = std::move(jobs.front());
|
||||
job = jobs.top().job;
|
||||
jobs.pop();
|
||||
}
|
||||
else
|
||||
@ -113,8 +128,8 @@ void ThreadPool::worker()
|
||||
shutdown = true;
|
||||
--active_jobs;
|
||||
}
|
||||
has_free_thread.notify_all();
|
||||
has_new_job_or_shutdown.notify_all();
|
||||
job_finished.notify_all();
|
||||
new_job_or_shutdown.notify_all();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -124,11 +139,15 @@ void ThreadPool::worker()
|
||||
--active_jobs;
|
||||
}
|
||||
|
||||
has_free_thread.notify_all();
|
||||
job_finished.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template class ThreadPoolImpl<std::thread>;
|
||||
template class ThreadPoolImpl<ThreadFromGlobalPool>;
|
||||
|
||||
|
||||
void ExceptionHandler::setException(std::exception_ptr && exception)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
|
Loading…
Reference in New Issue
Block a user