Moved files [#METR-17973].

This commit is contained in:
Alexey Milovidov 2015-10-05 03:52:29 +03:00
parent 4a061b97cb
commit 3a5032fd4a
26 changed files with 2057 additions and 13 deletions

View File

@ -1,6 +1,6 @@
#pragma once
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <Poco/Event.h>

View File

@ -7,7 +7,7 @@
#include <Poco/SharedPtr.h>
#include <Poco/Net/NetException.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/IO/WriteBuffer.h>

View File

@ -5,7 +5,7 @@
#include <functional>
#include <common/logger_useful.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/Core/StringRef.h>
#include <DB/Common/Arena.h>

View File

@ -9,7 +9,7 @@
#include <unordered_map>
#include <common/logger_useful.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>

View File

@ -12,7 +12,7 @@
#include <Poco/Util/Application.h>
#include <DB/Common/Stopwatch.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <stats/ReservoirSampler.h>
#include <boost/program_options.hpp>

View File

@ -16,7 +16,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Common/Stopwatch.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
typedef UInt64 Key;

View File

@ -16,7 +16,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Common/Stopwatch.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
typedef UInt64 Key;

View File

@ -1,7 +1,7 @@
#include <iostream>
#include <iomanip>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/IO/WriteBufferFromFileDescriptor.h>

View File

@ -2,7 +2,7 @@
#include <thread>
#include <future>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <Poco/DirectoryIterator.h>
#include <Poco/FileStream.h>

View File

@ -0,0 +1,28 @@
/*! \file
* \brief Main include.
*
* This is the only file you have to include in order to use the
* complete threadpool library.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_HPP_INCLUDED
#define THREADPOOL_HPP_INCLUDED
#include "threadpool/future.hpp"
#include "threadpool/pool.hpp"
#include "threadpool/pool_adaptors.hpp"
#include "threadpool/task_adaptors.hpp"
#endif // THREADPOOL_HPP_INCLUDED

View File

@ -0,0 +1,215 @@
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED
#define THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED
#include "locking_ptr.hpp"
#include <boost/smart_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/utility/result_of.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
namespace boost { namespace threadpool { namespace detail
{
template<class Result>
class future_impl
{
public:
typedef Result const & result_type; //!< Indicates the functor's result type.
typedef Result future_result_type; //!< Indicates the future's result type.
typedef future_impl<future_result_type> future_type;
private:
volatile bool m_ready;
volatile future_result_type m_result;
mutable mutex m_monitor;
mutable condition m_condition_ready;
volatile bool m_is_cancelled;
volatile bool m_executing;
public:
public:
future_impl()
: m_ready(false)
, m_is_cancelled(false)
{
}
bool ready() const volatile
{
return m_ready;
}
void wait() const volatile
{
const future_type* self = const_cast<const future_type*>(this);
mutex::scoped_lock lock(self->m_monitor);
while(!m_ready)
{
self->m_condition_ready.wait(lock);
}
}
bool timed_wait(boost::xtime const & timestamp) const
{
const future_type* self = const_cast<const future_type*>(this);
mutex::scoped_lock lock(self->m_monitor);
while(!m_ready)
{
if(!self->m_condition_ready.timed_wait(lock, timestamp)) return false;
}
return true;
}
result_type operator()() const volatile
{
wait();
/*
if( throw_exception_ != 0 )
{
throw_exception_( this );
}
*/
return *(const_cast<const future_result_type*>(&m_result));
}
void set_value(future_result_type const & r) volatile
{
locking_ptr<future_type, mutex> lockedThis(*this, m_monitor);
if(!m_ready && !m_is_cancelled)
{
lockedThis->m_result = r;
lockedThis->m_ready = true;
lockedThis->m_condition_ready.notify_all();
}
}
/*
template<class E> void set_exception() // throw()
{
m_impl->template set_exception<E>();
}
template<class E> void set_exception( char const * what ) // throw()
{
m_impl->template set_exception<E>( what );
}
*/
bool cancel() volatile
{
if(!m_ready || m_executing)
{
m_is_cancelled = true;
return true;
}
else
{
return false;
}
}
bool is_cancelled() const volatile
{
return m_is_cancelled;
}
void set_execution_status(bool executing) volatile
{
m_executing = executing;
}
};
template<
template <typename> class Future,
typename Function
>
class future_impl_task_func
{
public:
typedef void result_type; //!< Indicates the functor's result type.
typedef Function function_type; //!< Indicates the function's type.
typedef typename result_of<function_type()>::type future_result_type; //!< Indicates the future's result type.
typedef Future<future_result_type> future_type; //!< Indicates the future's type.
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<function_type()>::arity == 0);
// The task function's result type is required not to be void.
BOOST_STATIC_ASSERT(!is_void<future_result_type>::value);
private:
function_type m_function;
shared_ptr<future_type> m_future;
public:
future_impl_task_func(function_type const & function, shared_ptr<future_type> const & future)
: m_function(function)
, m_future(future)
{
}
void operator()()
{
if(m_function)
{
m_future->set_execution_status(true);
if(!m_future->is_cancelled())
{
// TODO future exeception handling
m_future->set_value(m_function());
}
m_future->set_execution_status(false); // TODO consider exceptions
}
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_FUTURE_IMPL_HPP_INCLUDED

View File

@ -0,0 +1,102 @@
/*! \file
* \brief The locking_ptr is smart pointer with a scoped locking mechanism.
*
* The class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
* locking_ptr is based on Andrei Alexandrescu's LockingPtr. For more information
* see article "volatile - Multithreaded Programmer's Best Friend" by A. Alexandrescu.
*
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#define THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#include <boost/utility.hpp>
// Support for old boost::thread
//**********************************************
#include <boost/thread/mutex.hpp>
#ifndef BOOST_THREAD_MUTEX_HPP
#include <boost/thread/detail/lock.hpp>
#endif
//**********************************************
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Smart pointer with a scoped locking mechanism.
*
* This class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
*/
template <typename T, typename Mutex>
class locking_ptr
: private noncopyable
{
T* m_obj; //!< The instance pointer.
Mutex & m_mutex; //!< Mutex is used for scoped locking.
public:
/// Constructor.
locking_ptr(volatile T& obj, const volatile Mutex& mtx)
: m_obj(const_cast<T*>(&obj))
, m_mutex(*const_cast<Mutex*>(&mtx))
{
// Lock mutex
#ifndef BOOST_THREAD_MUTEX_HPP
// Support for old boost::thread
boost::detail::thread::lock_ops<Mutex>::lock(m_mutex);
#else
m_mutex.lock();
#endif
}
/// Destructor.
~locking_ptr()
{
// Unlock mutex
#ifndef BOOST_THREAD_MUTEX_HPP
// Support for old boost::thread
boost::detail::thread::lock_ops<Mutex>::unlock(m_mutex);
#else
m_mutex.unlock();
#endif
}
/*! Returns a reference to the stored instance.
* \return The instance's reference.
*/
T& operator*() const
{
return *m_obj;
}
/*! Returns a pointer to the stored instance.
* \return The instance's pointer.
*/
T* operator->() const
{
return m_obj;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED

View File

@ -0,0 +1,453 @@
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED
#include "locking_ptr.hpp"
#include "worker_thread.hpp"
#include "../task_adaptors.hpp"
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
#include <vector>
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool_impl is DefaultConstructible and NonCopyable.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task,
template <typename> class SchedulingPolicy,
template <typename> class SizePolicy,
template <typename> class SizePolicyController,
template <typename> class ShutdownPolicy
>
class pool_core
: public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
, private noncopyable
{
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
typedef pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
//typedef typename size_policy_type::size_controller size_controller_type;
typedef SizePolicyController<pool_type> size_controller_type;
// typedef SizePolicy<pool_type>::size_controller size_controller_type;
typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
typedef worker_thread<pool_type> worker_type;
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
// The task function's result type is required to be void.
BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
private: // Friends
friend class worker_thread<pool_type>;
#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
friend class SizePolicy;
friend class ShutdownPolicy;
#else
friend class SizePolicy<pool_type>;
friend class ShutdownPolicy<pool_type>;
#endif
private: // The following members may be accessed by _multiple_ threads at the same time:
volatile size_t m_worker_count;
volatile size_t m_target_worker_count;
volatile size_t m_active_worker_count;
private: // The following members are accessed only by _one_ thread at the same time:
scheduler_type m_scheduler;
scoped_ptr<size_policy_type> m_size_policy; // is never null
bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
private: // The following members are implemented thread-safe:
mutable recursive_mutex m_monitor;
mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
public:
/// Constructor.
pool_core()
: m_worker_count(0)
, m_target_worker_count(0)
, m_active_worker_count(0)
, m_terminate_all_workers(false)
{
pool_type volatile & self_ref = *this;
m_size_policy.reset(new size_policy_type(self_ref));
m_scheduler.clear();
}
/// Destructor.
~pool_core()
{
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return size_controller_type(*m_size_policy, this->shared_from_this());
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const volatile
{
return m_worker_count;
}
// TODO is only called once
void shutdown()
{
ShutdownPolicy<pool_type>::shutdown(*this);
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const volatile
{
return m_active_worker_count;
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.size();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear() volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
lockedThis->m_scheduler.clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
return true;
}
private:
void terminate_all_workers(bool const wait) volatile
{
pool_type* self = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
self->m_terminate_all_workers = true;
m_target_worker_count = 0;
self->m_task_or_terminate_workers_event.notify_all();
if(wait)
{
while(m_worker_count > 0)
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
it != self->m_terminated_workers.end();
++it)
{
(*it)->join();
}
self->m_terminated_workers.clear();
}
}
/*! Changes the number of worker threads in the pool. The resizing
* is handled by the SizePolicy.
* \param threads The new number of worker threads.
* \return true, if pool will be resized and false if not.
*/
bool resize(size_t const worker_count) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(!m_terminate_all_workers)
{
m_target_worker_count = worker_count;
}
else
{
return false;
}
if(m_worker_count <= m_target_worker_count)
{ // increase worker count
while(m_worker_count < m_target_worker_count)
{
try
{
worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
m_worker_count++;
m_active_worker_count++;
}
catch(thread_resource_error)
{
return false;
}
}
}
else
{ // decrease worker count
lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
}
return true;
}
// worker died with unhandled exception
void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
else
{
lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
}
}
void worker_destructed(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
}
bool execute_task() volatile
{
function0<void> task;
{ // fetch task
pool_type* lockedThis = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// wait for tasks
while(lockedThis->m_scheduler.empty())
{
// decrease number of workers if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
else
{
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
lockedThis->m_task_or_terminate_workers_event.wait(lock);
m_active_worker_count++;
}
}
task = lockedThis->m_scheduler.top();
lockedThis->m_scheduler.pop();
}
// call task function
if(task)
{
task();
}
//guard->disable();
return true;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED

View File

@ -0,0 +1,65 @@
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#define THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#include <boost/function.hpp>
namespace boost { namespace threadpool { namespace detail
{
// TODO documentation
class scope_guard
: private boost::noncopyable
{
function0<void> const m_function;
bool m_is_active;
public:
scope_guard(function0<void> const & call_on_exit)
: m_function(call_on_exit)
, m_is_active(true)
{
}
~scope_guard()
{
if(m_is_active && m_function)
{
m_function();
}
}
void disable()
{
m_is_active = false;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED

View File

@ -0,0 +1,115 @@
/*! \file
* \brief Thread pool worker.
*
* The worker thread instance is attached to a pool
* and executes tasks of this pool.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#define THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#include "scope_guard.hpp"
#include <boost/smart_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool worker.
*
* A worker_thread represents a thread of execution. The worker is attached to a
* thread pool and processes tasks of that pool. The lifetime of the worker and its
* internal boost::thread is managed automatically.
*
* This class is a helper class and cannot be constructed or accessed directly.
*
* \see pool_core
*/
template <typename Pool>
class worker_thread
: public enable_shared_from_this< worker_thread<Pool> >
, private noncopyable
{
public:
typedef Pool pool_type; //!< Indicates the pool's type.
private:
shared_ptr<pool_type> m_pool; //!< Pointer to the pool which created the worker.
shared_ptr<boost::thread> m_thread; //!< Pointer to the thread which executes the run loop.
/*! Constructs a new worker.
* \param pool Pointer to it's parent pool.
* \see function create_and_attach
*/
worker_thread(shared_ptr<pool_type> const & pool)
: m_pool(pool)
{
assert(pool);
}
/*! Notifies that an exception occurred in the run loop.
*/
void died_unexpectedly()
{
m_pool->worker_died_unexpectedly(this->shared_from_this());
}
public:
/*! Executes pool's tasks sequentially.
*/
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {}
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
/*! Joins the worker's thread.
*/
void join()
{
m_thread->join();
}
/*! Constructs a new worker thread and attaches it to the pool.
* \param pool Pointer to the pool.
*/
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED

View File

@ -0,0 +1,144 @@
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_FUTURE_HPP_INCLUDED
#define THREADPOOL_FUTURE_HPP_INCLUDED
#include "detail/future.hpp"
#include <boost/utility/enable_if.hpp>
//#include "pool.hpp"
//#include <boost/utility.hpp>
//#include <boost/thread/mutex.hpp>
namespace boost { namespace threadpool
{
/*! \brief Experimental. Do not use in production code. TODO.
*
* TODO Future
*
* \see TODO
*
*/
template<class Result>
class future
{
private:
shared_ptr<detail::future_impl<Result> > m_impl;
public:
typedef Result const & result_type; //!< Indicates the functor's result type.
typedef Result future_result_type; //!< Indicates the future's result type.
public:
future()
: m_impl(new detail::future_impl<future_result_type>()) // TODO remove this
{
}
// only for internal usage
future(shared_ptr<detail::future_impl<Result> > const & impl)
: m_impl(impl)
{
}
bool ready() const
{
return m_impl->ready();
}
void wait() const
{
m_impl->wait();
}
bool timed_wait(boost::xtime const & timestamp) const
{
return m_impl->timed_wait(timestamp);
}
result_type operator()() // throw( thread::cancelation_exception, ... )
{
return (*m_impl)();
}
result_type get() // throw( thread::cancelation_exception, ... )
{
return (*m_impl)();
}
bool cancel()
{
return m_impl->cancel();
}
bool is_cancelled() const
{
return m_impl->is_cancelled();
}
};
template<class Pool, class Function>
typename disable_if <
is_void< typename result_of< Function() >::type >,
future< typename result_of< Function() >::type >
>::type
schedule(Pool& pool, const Function& task)
{
typedef typename result_of< Function() >::type future_result_type;
// create future impl and future
shared_ptr<detail::future_impl<future_result_type> > impl(new detail::future_impl<future_result_type>);
future <future_result_type> res(impl);
// schedule future impl
pool.schedule(detail::future_impl_task_func<detail::future_impl, Function>(task, impl));
// return future
return res;
/*
TODO
if(pool->schedule(bind(&Future::run, future)))
{
return future;
}
else
{
// construct empty future
return error_future;
}
*/
}
} } // namespace boost::threadpool
#endif // THREADPOOL_FUTURE_HPP_INCLUDED

View File

@ -0,0 +1,232 @@
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_HPP_INCLUDED
#define THREADPOOL_POOL_HPP_INCLUDED
#include <boost/ref.hpp>
#include "detail/pool_core.hpp"
#include "task_adaptors.hpp"
#include "detail/locking_ptr.hpp"
#include "scheduling_policies.hpp"
#include "size_policies.hpp"
#include "shutdown_policies.hpp"
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool is DefaultConstructible, CopyConstructible and Assignable.
* It has reference semantics; all copies of the same pool are equivalent and interchangeable.
* All operations on a pool except assignment are strongly thread safe or sequentially consistent;
* that is, the behavior of concurrent calls is as if the calls have been issued sequentially in an unspecified order.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param SchedulingPolicy A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task = task_func,
template <typename> class SchedulingPolicy = fifo_scheduler,
template <typename> class SizePolicy = static_size,
template <typename> class SizePolicyController = resize_controller,
template <typename> class ShutdownPolicy = wait_for_all_tasks
>
class thread_pool
{
typedef detail::pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy> pool_core_type;
shared_ptr<pool_core_type> m_core; // pimpl idiom
shared_ptr<void> m_shutdown_controller; // If the last pool holding a pointer to the core is deleted the controller shuts the pool down.
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
/* typedef thread_pool<Task,
SchedulingPolicy,
SizePolicy,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
*/
typedef SizePolicy<pool_core_type> size_policy_type;
typedef SizePolicyController<pool_core_type> size_controller_type;
public:
/*! Constructor.
* \param initial_threads The pool is immediately resized to set the specified number of threads. The pool's actual number threads depends on the SizePolicy.
*/
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return m_core->size_controller();
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const
{
return m_core->size();
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const
{
return m_core->active();
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const
{
return m_core->pending();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear()
{
m_core->clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const
{
return m_core->empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t task_threshold = 0) const
{
m_core->wait(task_threshold);
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t task_threshold = 0) const
{
return m_core->wait(timestamp, task_threshold);
}
};
/*! \brief Fifo pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;
/*! \brief Lifo pool.
*
* The pool's tasks are lifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, lifo_scheduler, static_size, resize_controller, wait_for_all_tasks> lifo_pool;
/*! \brief Pool for prioritized task.
*
* The pool's tasks are prioritized prio_task_func functors.
*
*/
typedef thread_pool<prio_task_func, prio_scheduler, static_size, resize_controller, wait_for_all_tasks> prio_pool;
/*! \brief A standard pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef fifo_pool pool;
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_HPP_INCLUDED

View File

@ -0,0 +1,70 @@
/*! \file
* \brief Pool adaptors.
*
* This file contains an easy-to-use adaptor similar to a smart
* pointer for the pool class.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#define THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#include <boost/smart_ptr.hpp>
namespace boost { namespace threadpool
{
// TODO convenience scheduling function
/*! Schedules a Runnable for asynchronous execution. A Runnable is an arbitrary class with a run()
* member function. This a convenience shorthand for pool->schedule(bind(&Runnable::run, task_object)).
* \param
* \param obj The Runnable object. The member function run() will be exectued and should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
template<typename Pool, typename Runnable>
bool schedule(Pool& pool, shared_ptr<Runnable> const & obj)
{
return pool->schedule(bind(&Runnable::run, obj));
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object.
*/
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(Pool& pool, typename Pool::task_type const & task)
{
return pool.schedule(task);
}
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(shared_ptr<Pool> const pool, typename Pool::task_type const & task)
{
return pool->schedule(task);
}
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED

View File

@ -0,0 +1,262 @@
/*! \file
* \brief Task scheduling policies.
*
* This file contains some fundamental scheduling policies for the pool class.
* A scheduling policy is realized by a task container which controls the access to
* the tasks. Fundamentally the container determines the order the tasks are processed
* by the thread pool.
* The task containers need not to be thread-safe because they are used by the pool
* in thread-safe way.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED
#define THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED
#include <queue>
#include <deque>
#include "task_adaptors.hpp"
namespace boost { namespace threadpool
{
/*! \brief SchedulingPolicy which implements FIFO ordering.
*
* This container implements a FIFO scheduling policy.
* The first task to be added to the scheduler will be the first to be removed.
* The processing proceeds sequentially in the same order.
* FIFO stands for "first in, first out".
*
* \param Task A function object which implements the operator()(void).
*
*/
template <typename Task = task_func>
class fifo_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::deque<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push_back(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop_front();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.front();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
m_container.clear();
}
};
/*! \brief SchedulingPolicy which implements LIFO ordering.
*
* This container implements a LIFO scheduling policy.
* The last task to be added to the scheduler will be the first to be removed.
* LIFO stands for "last in, first out".
*
* \param Task A function object which implements the operator()(void).
*
*/
template <typename Task = task_func>
class lifo_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::deque<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push_front(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop_front();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.front();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
m_container.clear();
}
};
/*! \brief SchedulingPolicy which implements prioritized ordering.
*
* This container implements a scheduling policy based on task priorities.
* The task with highest priority will be the first to be removed.
* It must be possible to compare two tasks using operator<.
*
* \param Task A function object which implements the operator() and operator<. operator< must be a partial ordering.
*
* \see prio_thread_func
*
*/
template <typename Task = prio_task_func>
class prio_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::priority_queue<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.top();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
while(!m_container.empty())
{
m_container.pop();
}
}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED

View File

@ -0,0 +1,83 @@
/*! \file
* \brief Shutdown policies.
*
* This file contains shutdown policies for thread_pool.
* A shutdown policy controls the pool's behavior from the time
* when the pool is not referenced any longer.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED
#define THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief ShutdownPolicy which waits for the completion of all tasks
* and the worker termination afterwards.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class wait_for_all_tasks
{
public:
static void shutdown(Pool& pool)
{
pool.wait();
pool.terminate_all_workers(true);
}
};
/*! \brief ShutdownPolicy which waits for the completion of all active tasks
* and the worker termination afterwards.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class wait_for_active_tasks
{
public:
static void shutdown(Pool& pool)
{
pool.clear();
pool.wait();
pool.terminate_all_workers(true);
}
};
/*! \brief ShutdownPolicy which does not wait for any tasks or worker termination.
*
* This policy does not wait for any tasks. Nevertheless all active tasks will be processed completely.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class immediately
{
public:
static void shutdown(Pool& pool)
{
pool.clear();
pool.terminate_all_workers(false);
}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED

View File

@ -0,0 +1,99 @@
/*! \file
* \brief Size policies.
*
* This file contains size policies for thread_pool. A size
* policy controls the number of worker threads in the pool.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SIZE_POLICIES_HPP_INCLUDED
#define THREADPOOL_SIZE_POLICIES_HPP_INCLUDED
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief SizePolicyController which provides no functionality.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
struct empty_controller
{
empty_controller(typename Pool::size_policy_type&, shared_ptr<Pool>) {}
};
/*! \brief SizePolicyController which allows resizing.
*
* \param Pool The pool's core type.
*/
template< typename Pool >
class resize_controller
{
typedef typename Pool::size_policy_type size_policy_type;
reference_wrapper<size_policy_type> m_policy;
shared_ptr<Pool> m_pool; //!< to make sure that the pool is alive (the policy pointer is valid) as long as the controller exists
public:
resize_controller(size_policy_type& policy, shared_ptr<Pool> pool)
: m_policy(policy)
, m_pool(pool)
{
}
bool resize(size_t worker_count)
{
return m_policy.get().resize(worker_count);
}
};
/*! \brief SizePolicy which preserves the thread count.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class static_size
{
reference_wrapper<Pool volatile> m_pool;
public:
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
static_size(Pool volatile & pool)
: m_pool(pool)
{}
bool resize(size_t const worker_count)
{
return m_pool.get().resize(worker_count);
}
void worker_died_unexpectedly(size_t const new_worker_count)
{
m_pool.get().resize(new_worker_count + 1);
}
// TODO this functions are not called yet
void task_scheduled() {}
void task_finished() {}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SIZE_POLICIES_HPP_INCLUDED

View File

@ -0,0 +1,176 @@
/*! \file
* \brief Task adaptors.
*
* This file contains adaptors for task function objects.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED
#define THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED
#include <boost/version.hpp>
#if BOOST_VERSION >= 105000
#ifndef TIME_UTC
#define TIME_UTC TIME_UTC_
#endif
#endif
#include <boost/smart_ptr.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
namespace boost { namespace threadpool
{
/*! \brief Standard task function object.
*
* This function object wraps a nullary function which returns void.
* The wrapped function is invoked by calling the operator ().
*
* \see boost function library
*
*/
typedef function0<void> task_func;
/*! \brief Prioritized task function object.
*
* This function object wraps a task_func object and binds a priority to it.
* prio_task_funcs can be compared using the operator < which realises a partial ordering.
* The wrapped task function is invoked by calling the operator ().
*
* \see prio_scheduler
*
*/
class prio_task_func
{
private:
unsigned int m_priority; //!< The priority of the task's function.
task_func m_function; //!< The task's function.
public:
typedef void result_type; //!< Indicates the functor's result type.
public:
/*! Constructor.
* \param priority The priority of the task.
* \param function The task's function object.
*/
prio_task_func(unsigned int const priority, task_func const & function)
: m_priority(priority)
, m_function(function)
{
}
/*! Executes the task function.
*/
void operator() (void) const
{
if(m_function)
{
m_function();
}
}
/*! Comparison operator which realises a partial ordering based on priorities.
* \param rhs The object to compare with.
* \return true if the priority of *this is less than right hand side's priority, false otherwise.
*/
bool operator< (const prio_task_func& rhs) const
{
return m_priority < rhs.m_priority;
}
}; // prio_task_func
/*! \brief Looped task function object.
*
* This function object wraps a boolean thread function object.
* The wrapped task function is invoked by calling the operator () and it is executed in regular
* time intervals until false is returned. The interval length may be zero.
* Please note that a pool's thread is engaged as long as the task is looped.
*
*/
class looped_task_func
{
private:
function0<bool> m_function; //!< The task's function.
unsigned int m_break_s; //!< Duration of breaks in seconds.
unsigned int m_break_ns; //!< Duration of breaks in nano seconds.
public:
typedef void result_type; //!< Indicates the functor's result type.
public:
/*! Constructor.
* \param function The task's function object which is looped until false is returned.
* \param interval The minimum break time in milli seconds before the first execution of the task function and between the following ones.
*/
looped_task_func(function0<bool> const & function, unsigned int const interval = 0)
: m_function(function)
{
m_break_s = interval / 1000;
m_break_ns = (interval - m_break_s * 1000) * 1000 * 1000;
}
/*! Executes the task function.
*/
void operator() (void) const
{
if(m_function)
{
if(m_break_s > 0 || m_break_ns > 0)
{ // Sleep some time before first execution
xtime xt;
xtime_get(&xt, TIME_UTC);
xt.nsec += m_break_ns;
xt.sec += m_break_s;
thread::sleep(xt);
}
while(m_function())
{
if(m_break_s > 0 || m_break_ns > 0)
{
xtime xt;
xtime_get(&xt, TIME_UTC);
xt.nsec += m_break_ns;
xt.sec += m_break_s;
thread::sleep(xt);
}
else
{
thread::yield(); // Be fair to other threads
}
}
}
}
}; // looped_task_func
} } // namespace boost::threadpool
#endif // THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED

View File

@ -1,6 +1,6 @@
#include <string.h>
#include <iostream>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <functional>
#include <common/MultiVersion.h>

View File

@ -14,7 +14,7 @@
#include <DB/Core/Exception.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/Common/Stopwatch.h>
#include <stdlib.h>

View File

@ -14,7 +14,7 @@
#include <DB/Core/Exception.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/Common/Stopwatch.h>
#include <stdlib.h>

View File

@ -17,7 +17,7 @@
#include <DB/Core/Exception.h>
#include <statdaemons/threadpool.hpp>
#include <common/threadpool.hpp>
#include <DB/Common/Stopwatch.h>