mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 21:51:57 +00:00
454 lines
14 KiB
C++
454 lines
14 KiB
C++
/*! \file
|
|
* \brief Thread pool core.
|
|
*
|
|
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
|
|
*
|
|
* Thread pools are a mechanism for asynchronous and parallel processing
|
|
* within the same process. The pool class provides a convenient way
|
|
* for dispatching asynchronous tasks as functions objects. The scheduling
|
|
* of these tasks can be easily controlled by using customized schedulers.
|
|
*
|
|
* Copyright (c) 2005-2007 Philipp Henkel
|
|
*
|
|
* Use, modification, and distribution are subject to the
|
|
* Boost Software License, Version 1.0. (See accompanying file
|
|
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
*
|
|
* http://threadpool.sourceforge.net
|
|
*
|
|
*/
|
|
|
|
|
|
#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
#define THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|
|
|
|
|
|
|
|
#include "locking_ptr.hpp"
|
|
#include "worker_thread.hpp"
|
|
|
|
#include "../task_adaptors.hpp"
|
|
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/exceptions.hpp>
|
|
#include <boost/thread/mutex.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
#include <boost/smart_ptr.hpp>
|
|
#include <boost/bind.hpp>
|
|
#include <boost/static_assert.hpp>
|
|
#include <boost/type_traits.hpp>
|
|
|
|
#include <vector>
|
|
|
|
|
|
/// The namespace threadpool contains a thread pool and related utility classes.
|
|
namespace boost { namespace threadpool { namespace detail
|
|
{
|
|
|
|
/*! \brief Thread pool.
|
|
*
|
|
* Thread pools are a mechanism for asynchronous and parallel processing
|
|
* within the same process. The pool class provides a convenient way
|
|
* for dispatching asynchronous tasks as functions objects. The scheduling
|
|
* of these tasks can be easily controlled by using customized schedulers.
|
|
* A task must not throw an exception.
|
|
*
|
|
* A pool_impl is DefaultConstructible and NonCopyable.
|
|
*
|
|
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
|
|
* \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
|
|
*
|
|
* \remarks The pool class is thread-safe.
|
|
*
|
|
* \see Tasks: task_func, prio_task_func
|
|
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
|
|
*/
|
|
template <
|
|
typename Task,
|
|
|
|
template <typename> class SchedulingPolicy,
|
|
template <typename> class SizePolicy,
|
|
template <typename> class SizePolicyController,
|
|
template <typename> class ShutdownPolicy
|
|
>
|
|
class pool_core
|
|
: public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
|
|
, private noncopyable
|
|
{
|
|
|
|
public: // Type definitions
|
|
typedef Task task_type; //!< Indicates the task's type.
|
|
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
|
|
typedef pool_core<Task,
|
|
SchedulingPolicy,
|
|
SizePolicy,
|
|
SizePolicyController,
|
|
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
|
|
typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
|
|
//typedef typename size_policy_type::size_controller size_controller_type;
|
|
|
|
typedef SizePolicyController<pool_type> size_controller_type;
|
|
|
|
// typedef SizePolicy<pool_type>::size_controller size_controller_type;
|
|
typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
|
|
|
|
typedef worker_thread<pool_type> worker_type;
|
|
|
|
// The task is required to be a nullary function.
|
|
BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
|
|
|
|
// The task function's result type is required to be void.
|
|
BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
|
|
|
|
|
|
private: // Friends
|
|
friend class worker_thread<pool_type>;
|
|
|
|
#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
|
|
friend class SizePolicy;
|
|
friend class ShutdownPolicy;
|
|
#else
|
|
friend class SizePolicy<pool_type>;
|
|
friend class ShutdownPolicy<pool_type>;
|
|
#endif
|
|
|
|
private: // The following members may be accessed by _multiple_ threads at the same time:
|
|
volatile size_t m_worker_count;
|
|
volatile size_t m_target_worker_count;
|
|
volatile size_t m_active_worker_count;
|
|
|
|
|
|
|
|
private: // The following members are accessed only by _one_ thread at the same time:
|
|
scheduler_type m_scheduler;
|
|
scoped_ptr<size_policy_type> m_size_policy; // is never null
|
|
|
|
bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
|
|
std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
|
|
|
|
private: // The following members are implemented thread-safe:
|
|
mutable recursive_mutex m_monitor;
|
|
mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
|
|
mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
|
|
|
|
public:
|
|
/// Constructor.
|
|
pool_core()
|
|
: m_worker_count(0)
|
|
, m_target_worker_count(0)
|
|
, m_active_worker_count(0)
|
|
, m_terminate_all_workers(false)
|
|
{
|
|
pool_type volatile & self_ref = *this;
|
|
m_size_policy.reset(new size_policy_type(self_ref));
|
|
|
|
m_scheduler.clear();
|
|
}
|
|
|
|
|
|
/// Destructor.
|
|
~pool_core()
|
|
{
|
|
}
|
|
|
|
/*! Gets the size controller which manages the number of threads in the pool.
|
|
* \return The size controller.
|
|
* \see SizePolicy
|
|
*/
|
|
size_controller_type size_controller()
|
|
{
|
|
return size_controller_type(*m_size_policy, this->shared_from_this());
|
|
}
|
|
|
|
/*! Gets the number of threads in the pool.
|
|
* \return The number of threads.
|
|
*/
|
|
size_t size() const volatile
|
|
{
|
|
return m_worker_count;
|
|
}
|
|
|
|
// TODO is only called once
|
|
void shutdown()
|
|
{
|
|
ShutdownPolicy<pool_type>::shutdown(*this);
|
|
}
|
|
|
|
/*! Schedules a task for asynchronous execution. The task will be executed once only.
|
|
* \param task The task function object. It should not throw execeptions.
|
|
* \return true, if the task could be scheduled and false otherwise.
|
|
*/
|
|
bool schedule(task_type const & task) volatile
|
|
{
|
|
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
if(lockedThis->m_scheduler.push(task))
|
|
{
|
|
lockedThis->m_task_or_terminate_workers_event.notify_one();
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
/*! Returns the number of tasks which are currently executed.
|
|
* \return The number of active tasks.
|
|
*/
|
|
size_t active() const volatile
|
|
{
|
|
return m_active_worker_count;
|
|
}
|
|
|
|
|
|
/*! Returns the number of tasks which are ready for execution.
|
|
* \return The number of pending tasks.
|
|
*/
|
|
size_t pending() const volatile
|
|
{
|
|
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
return lockedThis->m_scheduler.size();
|
|
}
|
|
|
|
|
|
/*! Removes all pending tasks from the pool's scheduler.
|
|
*/
|
|
void clear() volatile
|
|
{
|
|
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
lockedThis->m_scheduler.clear();
|
|
}
|
|
|
|
|
|
/*! Indicates that there are no tasks pending.
|
|
* \return true if there are no tasks ready for execution.
|
|
* \remarks This function is more efficient that the check 'pending() == 0'.
|
|
*/
|
|
bool empty() const volatile
|
|
{
|
|
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
return lockedThis->m_scheduler.empty();
|
|
}
|
|
|
|
|
|
/*! The current thread of execution is blocked until the sum of all active
|
|
* and pending tasks is equal or less than a given threshold.
|
|
* \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
*/
|
|
void wait(size_t const task_threshold = 0) const volatile
|
|
{
|
|
const pool_type* self = const_cast<const pool_type*>(this);
|
|
recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
if(0 == task_threshold)
|
|
{
|
|
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
{
|
|
self->m_worker_idle_or_terminated_event.wait(lock);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
{
|
|
self->m_worker_idle_or_terminated_event.wait(lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*! The current thread of execution is blocked until the timestamp is met
|
|
* or the sum of all active and pending tasks is equal or less
|
|
* than a given threshold.
|
|
* \param timestamp The time when function returns at the latest.
|
|
* \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
* \return true if the task sum is equal or less than the threshold, false otherwise.
|
|
*/
|
|
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
|
|
{
|
|
const pool_type* self = const_cast<const pool_type*>(this);
|
|
recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
if(0 == task_threshold)
|
|
{
|
|
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
{
|
|
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
{
|
|
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
private:
|
|
|
|
|
|
void terminate_all_workers(bool const wait) volatile
|
|
{
|
|
pool_type* self = const_cast<pool_type*>(this);
|
|
recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
self->m_terminate_all_workers = true;
|
|
|
|
m_target_worker_count = 0;
|
|
self->m_task_or_terminate_workers_event.notify_all();
|
|
|
|
if(wait)
|
|
{
|
|
while(m_worker_count > 0)
|
|
{
|
|
self->m_worker_idle_or_terminated_event.wait(lock);
|
|
}
|
|
|
|
for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
|
|
it != self->m_terminated_workers.end();
|
|
++it)
|
|
{
|
|
(*it)->join();
|
|
}
|
|
self->m_terminated_workers.clear();
|
|
}
|
|
}
|
|
|
|
|
|
/*! Changes the number of worker threads in the pool. The resizing
|
|
* is handled by the SizePolicy.
|
|
* \param threads The new number of worker threads.
|
|
* \return true, if pool will be resized and false if not.
|
|
*/
|
|
bool resize(size_t const worker_count) volatile
|
|
{
|
|
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
if(!m_terminate_all_workers)
|
|
{
|
|
m_target_worker_count = worker_count;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
|
|
|
|
if(m_worker_count <= m_target_worker_count)
|
|
{ // increase worker count
|
|
while(m_worker_count < m_target_worker_count)
|
|
{
|
|
try
|
|
{
|
|
worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
|
|
m_worker_count++;
|
|
m_active_worker_count++;
|
|
}
|
|
catch(thread_resource_error)
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{ // decrease worker count
|
|
lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
// worker died with unhandled exception
|
|
void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
|
|
{
|
|
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
m_worker_count--;
|
|
m_active_worker_count--;
|
|
lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
|
if(m_terminate_all_workers)
|
|
{
|
|
lockedThis->m_terminated_workers.push_back(worker);
|
|
}
|
|
else
|
|
{
|
|
lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
|
|
}
|
|
}
|
|
|
|
void worker_destructed(shared_ptr<worker_type> worker) volatile
|
|
{
|
|
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
m_worker_count--;
|
|
m_active_worker_count--;
|
|
lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
|
if(m_terminate_all_workers)
|
|
{
|
|
lockedThis->m_terminated_workers.push_back(worker);
|
|
}
|
|
}
|
|
|
|
|
|
bool execute_task() volatile
|
|
{
|
|
function0<void> task;
|
|
|
|
{ // fetch task
|
|
pool_type* lockedThis = const_cast<pool_type*>(this);
|
|
recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
|
|
|
|
// decrease number of threads if necessary
|
|
if(m_worker_count > m_target_worker_count)
|
|
{
|
|
return false; // terminate worker
|
|
}
|
|
|
|
|
|
// wait for tasks
|
|
while(lockedThis->m_scheduler.empty())
|
|
{
|
|
// decrease number of workers if necessary
|
|
if(m_worker_count > m_target_worker_count)
|
|
{
|
|
return false; // terminate worker
|
|
}
|
|
else
|
|
{
|
|
m_active_worker_count--;
|
|
lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
lockedThis->m_task_or_terminate_workers_event.wait(lock);
|
|
m_active_worker_count++;
|
|
}
|
|
}
|
|
|
|
task = lockedThis->m_scheduler.top();
|
|
lockedThis->m_scheduler.pop();
|
|
}
|
|
|
|
// call task function
|
|
if(task)
|
|
{
|
|
task();
|
|
}
|
|
|
|
//guard->disable();
|
|
return true;
|
|
}
|
|
};
|
|
|
|
|
|
|
|
|
|
} } } // namespace boost::threadpool::detail
|
|
|
|
#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
|