/*! \file * \brief Thread pool core. * * This file contains the threadpool's core class: pool. * * Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers. * * Copyright (c) 2005-2007 Philipp Henkel * * Use, modification, and distribution are subject to the * Boost Software License, Version 1.0. (See accompanying file * LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) * * http://threadpool.sourceforge.net * */ #ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED #define THREADPOOL_POOL_CORE_HPP_INCLUDED #include "locking_ptr.hpp" #include "worker_thread.hpp" #include "../task_adaptors.hpp" #include #include #include #include #include #include #include #include #include /// The namespace threadpool contains a thread pool and related utility classes. namespace boost { namespace threadpool { namespace detail { /*! \brief Thread pool. * * Thread pools are a mechanism for asynchronous and parallel processing * within the same process. The pool class provides a convenient way * for dispatching asynchronous tasks as functions objects. The scheduling * of these tasks can be easily controlled by using customized schedulers. * A task must not throw an exception. * * A pool_impl is DefaultConstructible and NonCopyable. * * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored. * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions. * * \remarks The pool class is thread-safe. * * \see Tasks: task_func, prio_task_func * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler */ template < typename Task, template class SchedulingPolicy, template class SizePolicy, template class SizePolicyController, template class ShutdownPolicy > class pool_core : public enable_shared_from_this< pool_core > , private noncopyable { public: // Type definitions typedef Task task_type; //!< Indicates the task's type. typedef SchedulingPolicy scheduler_type; //!< Indicates the scheduler's type. typedef pool_core pool_type; //!< Indicates the thread pool's type. typedef SizePolicy size_policy_type; //!< Indicates the sizer's type. //typedef typename size_policy_type::size_controller size_controller_type; typedef SizePolicyController size_controller_type; // typedef SizePolicy::size_controller size_controller_type; typedef ShutdownPolicy shutdown_policy_type;//!< Indicates the shutdown policy's type. typedef worker_thread worker_type; // The task is required to be a nullary function. BOOST_STATIC_ASSERT(function_traits::arity == 0); // The task function's result type is required to be void. BOOST_STATIC_ASSERT(is_void::type >::value); private: // Friends friend class worker_thread; #if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06 friend class SizePolicy; friend class ShutdownPolicy; #else friend class SizePolicy; friend class ShutdownPolicy; #endif private: // The following members may be accessed by _multiple_ threads at the same time: volatile size_t m_worker_count; volatile size_t m_target_worker_count; volatile size_t m_active_worker_count; private: // The following members are accessed only by _one_ thread at the same time: scheduler_type m_scheduler; scoped_ptr m_size_policy; // is never null bool m_terminate_all_workers; // Indicates if termination of all workers was triggered. std::vector > m_terminated_workers; // List of workers which are terminated but not fully destructed. private: // The following members are implemented thread-safe: mutable recursive_mutex m_monitor; mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated. mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced. public: /// Constructor. pool_core() : m_worker_count(0) , m_target_worker_count(0) , m_active_worker_count(0) , m_terminate_all_workers(false) { pool_type volatile & self_ref = *this; m_size_policy.reset(new size_policy_type(self_ref)); m_scheduler.clear(); } /// Destructor. ~pool_core() { } /*! Gets the size controller which manages the number of threads in the pool. * \return The size controller. * \see SizePolicy */ size_controller_type size_controller() { return size_controller_type(*m_size_policy, this->shared_from_this()); } /*! Gets the number of threads in the pool. * \return The number of threads. */ size_t size() const volatile { return m_worker_count; } // TODO is only called once void shutdown() { ShutdownPolicy::shutdown(*this); } /*! Schedules a task for asynchronous execution. The task will be executed once only. * \param task The task function object. It should not throw execeptions. * \return true, if the task could be scheduled and false otherwise. */ bool schedule(task_type const & task) volatile { locking_ptr lockedThis(*this, m_monitor); if(lockedThis->m_scheduler.push(task)) { lockedThis->m_task_or_terminate_workers_event.notify_one(); return true; } else { return false; } } /*! Returns the number of tasks which are currently executed. * \return The number of active tasks. */ size_t active() const volatile { return m_active_worker_count; } /*! Returns the number of tasks which are ready for execution. * \return The number of pending tasks. */ size_t pending() const volatile { locking_ptr lockedThis(*this, m_monitor); return lockedThis->m_scheduler.size(); } /*! Removes all pending tasks from the pool's scheduler. */ void clear() volatile { locking_ptr lockedThis(*this, m_monitor); lockedThis->m_scheduler.clear(); } /*! Indicates that there are no tasks pending. * \return true if there are no tasks ready for execution. * \remarks This function is more efficient that the check 'pending() == 0'. */ bool empty() const volatile { locking_ptr lockedThis(*this, m_monitor); return lockedThis->m_scheduler.empty(); } /*! The current thread of execution is blocked until the sum of all active * and pending tasks is equal or less than a given threshold. * \param task_threshold The maximum number of tasks in pool and scheduler. */ void wait(size_t const task_threshold = 0) const volatile { const pool_type* self = const_cast(this); recursive_mutex::scoped_lock lock(self->m_monitor); if(0 == task_threshold) { while(0 != self->m_active_worker_count || !self->m_scheduler.empty()) { self->m_worker_idle_or_terminated_event.wait(lock); } } else { while(task_threshold < self->m_active_worker_count + self->m_scheduler.size()) { self->m_worker_idle_or_terminated_event.wait(lock); } } } /*! The current thread of execution is blocked until the timestamp is met * or the sum of all active and pending tasks is equal or less * than a given threshold. * \param timestamp The time when function returns at the latest. * \param task_threshold The maximum number of tasks in pool and scheduler. * \return true if the task sum is equal or less than the threshold, false otherwise. */ bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile { const pool_type* self = const_cast(this); recursive_mutex::scoped_lock lock(self->m_monitor); if(0 == task_threshold) { while(0 != self->m_active_worker_count || !self->m_scheduler.empty()) { if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false; } } else { while(task_threshold < self->m_active_worker_count + self->m_scheduler.size()) { if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false; } } return true; } private: void terminate_all_workers(bool const wait) volatile { pool_type* self = const_cast(this); recursive_mutex::scoped_lock lock(self->m_monitor); self->m_terminate_all_workers = true; m_target_worker_count = 0; self->m_task_or_terminate_workers_event.notify_all(); if(wait) { while(m_worker_count > 0) { self->m_worker_idle_or_terminated_event.wait(lock); } for(typename std::vector >::iterator it = self->m_terminated_workers.begin(); it != self->m_terminated_workers.end(); ++it) { (*it)->join(); } self->m_terminated_workers.clear(); } } /*! Changes the number of worker threads in the pool. The resizing * is handled by the SizePolicy. * \param threads The new number of worker threads. * \return true, if pool will be resized and false if not. */ bool resize(size_t const worker_count) volatile { locking_ptr lockedThis(*this, m_monitor); if(!m_terminate_all_workers) { m_target_worker_count = worker_count; } else { return false; } if(m_worker_count <= m_target_worker_count) { // increase worker count while(m_worker_count < m_target_worker_count) { try { worker_thread::create_and_attach(lockedThis->shared_from_this()); m_worker_count++; m_active_worker_count++; } catch(thread_resource_error) { return false; } } } else { // decrease worker count lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers } return true; } // worker died with unhandled exception void worker_died_unexpectedly(shared_ptr worker) volatile { locking_ptr lockedThis(*this, m_monitor); m_worker_count--; m_active_worker_count--; lockedThis->m_worker_idle_or_terminated_event.notify_all(); if(m_terminate_all_workers) { lockedThis->m_terminated_workers.push_back(worker); } else { lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count); } } void worker_destructed(shared_ptr worker) volatile { locking_ptr lockedThis(*this, m_monitor); m_worker_count--; m_active_worker_count--; lockedThis->m_worker_idle_or_terminated_event.notify_all(); if(m_terminate_all_workers) { lockedThis->m_terminated_workers.push_back(worker); } } bool execute_task() volatile { function0 task; { // fetch task pool_type* lockedThis = const_cast(this); recursive_mutex::scoped_lock lock(lockedThis->m_monitor); // decrease number of threads if necessary if(m_worker_count > m_target_worker_count) { return false; // terminate worker } // wait for tasks while(lockedThis->m_scheduler.empty()) { // decrease number of workers if necessary if(m_worker_count > m_target_worker_count) { return false; // terminate worker } else { m_active_worker_count--; lockedThis->m_worker_idle_or_terminated_event.notify_all(); lockedThis->m_task_or_terminate_workers_event.wait(lock); m_active_worker_count++; } } task = lockedThis->m_scheduler.top(); lockedThis->m_scheduler.pop(); } // call task function if(task) { task(); } //guard->disable(); return true; } }; } } } // namespace boost::threadpool::detail #endif // THREADPOOL_POOL_CORE_HPP_INCLUDED