From 510b15540cece88b4761f141011336b92cc964cf Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 11 Jan 2019 19:58:43 +0300 Subject: [PATCH] Attempt to implemnt global thread pool #4018 --- dbms/src/Databases/IDatabase.h | 4 +- .../src/Interpreters/InterpreterCreateQuery.h | 4 +- libs/libcommon/include/common/ThreadPool.h | 73 +++++++++++++++---- libs/libcommon/src/ThreadPool.cpp | 61 ++++++++++------ 4 files changed, 103 insertions(+), 39 deletions(-) diff --git a/dbms/src/Databases/IDatabase.h b/dbms/src/Databases/IDatabase.h index 7b7b877b0e1..64a65c2fb5f 100644 --- a/dbms/src/Databases/IDatabase.h +++ b/dbms/src/Databases/IDatabase.h @@ -8,12 +8,10 @@ #include #include #include +#include #include -class ThreadPool; - - namespace DB { diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index e450ae0728e..40089e17d25 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -2,12 +2,12 @@ #include #include +#include -class ThreadPool; - namespace DB { + class Context; class ASTCreateQuery; class ASTExpressionList; diff --git a/libs/libcommon/include/common/ThreadPool.h b/libs/libcommon/include/common/ThreadPool.h index dd82c0c0399..02e1a02c58e 100644 --- a/libs/libcommon/include/common/ThreadPool.h +++ b/libs/libcommon/include/common/ThreadPool.h @@ -7,6 +7,7 @@ #include #include #include +#include /** Very simple thread pool similar to boost::threadpool. @@ -14,17 +15,22 @@ * - catches exceptions and rethrows on wait. */ -class ThreadPool +template +class ThreadPoolImpl { public: using Job = std::function; - /// Size is constant, all threads are created immediately. - explicit ThreadPool(size_t m_size); + /// Size is constant. Up to num_threads are created on demand and then run until shutdown. + explicit ThreadPoolImpl(size_t num_threads); - /// Add new job. Locks until free thread in pool become available or exception in one of threads was thrown. + /// queue_size - maximum number of running plus scheduled jobs. It can be greater than num_threads. Zero means unlimited. + ThreadPoolImpl(size_t num_threads, size_t queue_size); + + /// Add new job. Locks until number of active jobs is less than maximum or exception in one of threads was thrown. /// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function. - void schedule(Job job); + /// Priority: greater is higher. + void schedule(Job job, int priority = 0); /// Wait for all currently active jobs to be done. /// You may call schedule and wait many times in arbitary order. @@ -34,24 +40,40 @@ public: /// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions). /// You should not destroy object while calling schedule or wait methods from another threads. - ~ThreadPool(); + ~ThreadPoolImpl(); - size_t size() const { return m_size; } + size_t size() const { return num_threads; } - /// Returns number of active jobs. + /// Returns number of running and scheduled jobs. size_t active() const; private: mutable std::mutex mutex; - std::condition_variable has_free_thread; - std::condition_variable has_new_job_or_shutdown; + std::condition_variable job_finished; + std::condition_variable new_job_or_shutdown; + + const size_t num_threads; + const size_t queue_size; - const size_t m_size; size_t active_jobs = 0; bool shutdown = false; - std::queue jobs; - std::vector threads; + struct JobWithPriority + { + Job job; + int priority; + + JobWithPriority(Job job, int priority) + : job(job), priority(priority) {} + + bool operator< (const JobWithPriority & rhs) const + { + return priority < rhs.priority; + } + }; + + std::priority_queue jobs; + std::vector threads; std::exception_ptr first_exception; @@ -61,6 +83,31 @@ private: }; +using FreeThreadPool = ThreadPoolImpl; + +class GlobalThreadPool : public FreeThreadPool, public ext::singleton +{ +public: + GlobalThreadPool() : FreeThreadPool(10000) {} /// TODO: global blocking limit may lead to deadlocks. +}; + +class ThreadFromGlobalPool +{ +public: + ThreadFromGlobalPool(std::function func) + { + GlobalThreadPool::instance().schedule(func); + } + + void join() + { + /// noop, the std::thread will continue to run inside global pool. + } +}; + +using ThreadPool = ThreadPoolImpl; + + /// Allows to save first catched exception in jobs and postpone its rethrow. class ExceptionHandler { diff --git a/libs/libcommon/src/ThreadPool.cpp b/libs/libcommon/src/ThreadPool.cpp index e45e64853dc..e460acb3163 100644 --- a/libs/libcommon/src/ThreadPool.cpp +++ b/libs/libcommon/src/ThreadPool.cpp @@ -2,14 +2,21 @@ #include -ThreadPool::ThreadPool(size_t m_size) - : m_size(m_size) +template +ThreadPoolImpl::ThreadPoolImpl(size_t num_threads) + : ThreadPoolImpl(num_threads, num_threads) { - threads.reserve(m_size); +} + +template +ThreadPoolImpl::ThreadPoolImpl(size_t num_threads, size_t queue_size) + : num_threads(num_threads), queue_size(queue_size) +{ + threads.reserve(num_threads); try { - for (size_t i = 0; i < m_size; ++i) + for (size_t i = 0; i < num_threads; ++i) threads.emplace_back([this] { worker(); }); } catch (...) @@ -19,25 +26,30 @@ ThreadPool::ThreadPool(size_t m_size) } } -void ThreadPool::schedule(Job job) +template +void ThreadPoolImpl::schedule(Job job, int priority) { { std::unique_lock lock(mutex); - has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; }); + job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; }); if (shutdown) return; - jobs.push(std::move(job)); + jobs.emplace(std::move(job), priority); ++active_jobs; + + if (threads.size() < std::min(num_threads, active_jobs)) + threads.emplace_back([this] { worker(); }); } - has_new_job_or_shutdown.notify_one(); + new_job_or_shutdown.notify_one(); } -void ThreadPool::wait() +template +void ThreadPoolImpl::wait() { { std::unique_lock lock(mutex); - has_free_thread.wait(lock, [this] { return active_jobs == 0; }); + job_finished.wait(lock, [this] { return active_jobs == 0; }); if (first_exception) { @@ -48,19 +60,21 @@ void ThreadPool::wait() } } -ThreadPool::~ThreadPool() +template +ThreadPoolImpl::~ThreadPoolImpl() { finalize(); } -void ThreadPool::finalize() +template +void ThreadPoolImpl::finalize() { { std::unique_lock lock(mutex); shutdown = true; } - has_new_job_or_shutdown.notify_all(); + new_job_or_shutdown.notify_all(); for (auto & thread : threads) thread.join(); @@ -68,14 +82,15 @@ void ThreadPool::finalize() threads.clear(); } -size_t ThreadPool::active() const +template +size_t ThreadPoolImpl::active() const { std::unique_lock lock(mutex); return active_jobs; } - -void ThreadPool::worker() +template +void ThreadPoolImpl::worker() { while (true) { @@ -84,12 +99,12 @@ void ThreadPool::worker() { std::unique_lock lock(mutex); - has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); }); + new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); }); need_shutdown = shutdown; if (!jobs.empty()) { - job = std::move(jobs.front()); + job = jobs.top().job; jobs.pop(); } else @@ -113,8 +128,8 @@ void ThreadPool::worker() shutdown = true; --active_jobs; } - has_free_thread.notify_all(); - has_new_job_or_shutdown.notify_all(); + job_finished.notify_all(); + new_job_or_shutdown.notify_all(); return; } } @@ -124,11 +139,15 @@ void ThreadPool::worker() --active_jobs; } - has_free_thread.notify_all(); + job_finished.notify_all(); } } +template class ThreadPoolImpl; +template class ThreadPoolImpl; + + void ExceptionHandler::setException(std::exception_ptr && exception) { std::unique_lock lock(mutex);