Removed useless code (experimental) [#METR-2944].

This commit is contained in:
Alexey Milovidov 2016-08-02 04:46:05 +03:00
parent 1c25aa3b9d
commit 9a07830dbb
46 changed files with 469 additions and 1988 deletions

View File

@ -95,7 +95,6 @@ include_directories (${ClickHouse_SOURCE_DIR}/contrib/libsparsehash/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libre2/)
include_directories (${ClickHouse_BINARY_DIR}/contrib/libre2/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libzookeeper/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libboost-threadpool/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Foundation/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Util/include/)
include_directories (${ClickHouse_SOURCE_DIR}/contrib/libpoco/Net/include/)

View File

@ -6,7 +6,6 @@ add_subdirectory (libfarmhash)
add_subdirectory (libmetrohash)
add_subdirectory (libpoco)
add_subdirectory (libre2)
add_subdirectory (libboost-threadpool)
add_subdirectory (libtcmalloc)
add_subdirectory (libzookeeper)

View File

@ -1,5 +0,0 @@
Copyright (c) 2005-2007 Philipp Henkel
Use, modification, and distribution are subject to the
Boost Software License, Version 1.0. (See accompanying file
LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

View File

@ -1,23 +0,0 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,27 +0,0 @@
/*! \file
* \brief Main include.
*
* This is the only file you have to include in order to use the
* complete threadpool library.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_HPP_INCLUDED
#define THREADPOOL_HPP_INCLUDED
#include "threadpool/pool.hpp"
#include "threadpool/pool_adaptors.hpp"
#include "threadpool/task_adaptors.hpp"
#endif // THREADPOOL_HPP_INCLUDED

View File

@ -1,102 +0,0 @@
/*! \file
* \brief The locking_ptr is smart pointer with a scoped locking mechanism.
*
* The class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
* locking_ptr is based on Andrei Alexandrescu's LockingPtr. For more information
* see article "volatile - Multithreaded Programmer's Best Friend" by A. Alexandrescu.
*
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#define THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED
#include <boost/utility.hpp>
// Support for old boost::thread
//**********************************************
#include <boost/thread/mutex.hpp>
#ifndef BOOST_THREAD_MUTEX_HPP
#include <boost/thread/detail/lock.hpp>
#endif
//**********************************************
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Smart pointer with a scoped locking mechanism.
*
* This class is a wrapper for a volatile pointer. It enables synchronized access to the
* internal pointer by locking the passed mutex.
*/
template <typename T, typename Mutex>
class locking_ptr
: private noncopyable
{
T* m_obj; //!< The instance pointer.
Mutex & m_mutex; //!< Mutex is used for scoped locking.
public:
/// Constructor.
locking_ptr(volatile T& obj, const volatile Mutex& mtx)
: m_obj(const_cast<T*>(&obj))
, m_mutex(*const_cast<Mutex*>(&mtx))
{
// Lock mutex
#ifndef BOOST_THREAD_MUTEX_HPP
// Support for old boost::thread
boost::detail::thread::lock_ops<Mutex>::lock(m_mutex);
#else
m_mutex.lock();
#endif
}
/// Destructor.
~locking_ptr()
{
// Unlock mutex
#ifndef BOOST_THREAD_MUTEX_HPP
// Support for old boost::thread
boost::detail::thread::lock_ops<Mutex>::unlock(m_mutex);
#else
m_mutex.unlock();
#endif
}
/*! Returns a reference to the stored instance.
* \return The instance's reference.
*/
T& operator*() const
{
return *m_obj;
}
/*! Returns a pointer to the stored instance.
* \return The instance's pointer.
*/
T* operator->() const
{
return m_obj;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_LOCKING_PTR_HPP_INCLUDED

View File

@ -1,453 +0,0 @@
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED
#include "locking_ptr.hpp"
#include "worker_thread.hpp"
#include "../task_adaptors.hpp"
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
#include <vector>
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool_impl is DefaultConstructible and NonCopyable.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task,
template <typename> class SchedulingPolicy,
template <typename> class SizePolicy,
template <typename> class SizePolicyController,
template <typename> class ShutdownPolicy
>
class pool_core
: public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
, private noncopyable
{
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
typedef pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
//typedef typename size_policy_type::size_controller size_controller_type;
typedef SizePolicyController<pool_type> size_controller_type;
// typedef SizePolicy<pool_type>::size_controller size_controller_type;
typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
typedef worker_thread<pool_type> worker_type;
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
// The task function's result type is required to be void.
BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
private: // Friends
friend class worker_thread<pool_type>;
#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
friend class SizePolicy;
friend class ShutdownPolicy;
#else
friend class SizePolicy<pool_type>;
friend class ShutdownPolicy<pool_type>;
#endif
private: // The following members may be accessed by _multiple_ threads at the same time:
volatile size_t m_worker_count;
volatile size_t m_target_worker_count;
volatile size_t m_active_worker_count;
private: // The following members are accessed only by _one_ thread at the same time:
scheduler_type m_scheduler;
scoped_ptr<size_policy_type> m_size_policy; // is never null
bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
private: // The following members are implemented thread-safe:
mutable recursive_mutex m_monitor;
mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
public:
/// Constructor.
pool_core()
: m_worker_count(0)
, m_target_worker_count(0)
, m_active_worker_count(0)
, m_terminate_all_workers(false)
{
pool_type volatile & self_ref = *this;
m_size_policy.reset(new size_policy_type(self_ref));
m_scheduler.clear();
}
/// Destructor.
~pool_core()
{
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return size_controller_type(*m_size_policy, this->shared_from_this());
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const volatile
{
return m_worker_count;
}
// TODO is only called once
void shutdown()
{
ShutdownPolicy<pool_type>::shutdown(*this);
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const volatile
{
return m_active_worker_count;
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.size();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear() volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
lockedThis->m_scheduler.clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
return true;
}
private:
void terminate_all_workers(bool const wait) volatile
{
pool_type* self = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
self->m_terminate_all_workers = true;
m_target_worker_count = 0;
self->m_task_or_terminate_workers_event.notify_all();
if(wait)
{
while(m_worker_count > 0)
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
it != self->m_terminated_workers.end();
++it)
{
(*it)->join();
}
self->m_terminated_workers.clear();
}
}
/*! Changes the number of worker threads in the pool. The resizing
* is handled by the SizePolicy.
* \param threads The new number of worker threads.
* \return true, if pool will be resized and false if not.
*/
bool resize(size_t const worker_count) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(!m_terminate_all_workers)
{
m_target_worker_count = worker_count;
}
else
{
return false;
}
if(m_worker_count <= m_target_worker_count)
{ // increase worker count
while(m_worker_count < m_target_worker_count)
{
try
{
worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
m_worker_count++;
m_active_worker_count++;
}
catch(thread_resource_error)
{
return false;
}
}
}
else
{ // decrease worker count
lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
}
return true;
}
// worker died with unhandled exception
void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
else
{
lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
}
}
void worker_destructed(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
}
bool execute_task() volatile
{
function0<void> task;
{ // fetch task
pool_type* lockedThis = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// wait for tasks
while(lockedThis->m_scheduler.empty())
{
// decrease number of workers if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
else
{
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
lockedThis->m_task_or_terminate_workers_event.wait(lock);
m_active_worker_count++;
}
}
task = lockedThis->m_scheduler.top();
lockedThis->m_scheduler.pop();
}
// call task function
if(task)
{
task();
}
//guard->disable();
return true;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED

View File

@ -1,65 +0,0 @@
/*! \file
* \brief TODO.
*
* TODO.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#define THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED
#include <boost/function.hpp>
namespace boost { namespace threadpool { namespace detail
{
// TODO documentation
class scope_guard
: private boost::noncopyable
{
function0<void> const m_function;
bool m_is_active;
public:
scope_guard(function0<void> const & call_on_exit)
: m_function(call_on_exit)
, m_is_active(true)
{
}
~scope_guard()
{
if(m_is_active && m_function)
{
m_function();
}
}
void disable()
{
m_is_active = false;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_SCOPE_GUARD_HPP_INCLUDED

View File

@ -1,115 +0,0 @@
/*! \file
* \brief Thread pool worker.
*
* The worker thread instance is attached to a pool
* and executes tasks of this pool.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#define THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED
#include "scope_guard.hpp"
#include <boost/smart_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool worker.
*
* A worker_thread represents a thread of execution. The worker is attached to a
* thread pool and processes tasks of that pool. The lifetime of the worker and its
* internal boost::thread is managed automatically.
*
* This class is a helper class and cannot be constructed or accessed directly.
*
* \see pool_core
*/
template <typename Pool>
class worker_thread
: public enable_shared_from_this< worker_thread<Pool> >
, private noncopyable
{
public:
typedef Pool pool_type; //!< Indicates the pool's type.
private:
shared_ptr<pool_type> m_pool; //!< Pointer to the pool which created the worker.
shared_ptr<boost::thread> m_thread; //!< Pointer to the thread which executes the run loop.
/*! Constructs a new worker.
* \param pool Pointer to it's parent pool.
* \see function create_and_attach
*/
worker_thread(shared_ptr<pool_type> const & pool)
: m_pool(pool)
{
assert(pool);
}
/*! Notifies that an exception occurred in the run loop.
*/
void died_unexpectedly()
{
m_pool->worker_died_unexpectedly(this->shared_from_this());
}
public:
/*! Executes pool's tasks sequentially.
*/
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {}
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
/*! Joins the worker's thread.
*/
void join()
{
m_thread->join();
}
/*! Constructs a new worker thread and attaches it to the pool.
* \param pool Pointer to the pool.
*/
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_DETAIL_WORKER_THREAD_HPP_INCLUDED

View File

@ -1,232 +0,0 @@
/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_HPP_INCLUDED
#define THREADPOOL_POOL_HPP_INCLUDED
#include <boost/ref.hpp>
#include "detail/pool_core.hpp"
#include "task_adaptors.hpp"
#include "detail/locking_ptr.hpp"
#include "scheduling_policies.hpp"
#include "size_policies.hpp"
#include "shutdown_policies.hpp"
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool is DefaultConstructible, CopyConstructible and Assignable.
* It has reference semantics; all copies of the same pool are equivalent and interchangeable.
* All operations on a pool except assignment are strongly thread safe or sequentially consistent;
* that is, the behavior of concurrent calls is as if the calls have been issued sequentially in an unspecified order.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param SchedulingPolicy A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task = task_func,
template <typename> class SchedulingPolicy = fifo_scheduler,
template <typename> class SizePolicy = static_size,
template <typename> class SizePolicyController = resize_controller,
template <typename> class ShutdownPolicy = wait_for_all_tasks
>
class thread_pool
{
typedef detail::pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy> pool_core_type;
shared_ptr<pool_core_type> m_core; // pimpl idiom
shared_ptr<void> m_shutdown_controller; // If the last pool holding a pointer to the core is deleted the controller shuts the pool down.
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
/* typedef thread_pool<Task,
SchedulingPolicy,
SizePolicy,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
*/
typedef SizePolicy<pool_core_type> size_policy_type;
typedef SizePolicyController<pool_core_type> size_controller_type;
public:
/*! Constructor.
* \param initial_threads The pool is immediately resized to set the specified number of threads. The pool's actual number threads depends on the SizePolicy.
*/
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return m_core->size_controller();
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const
{
return m_core->size();
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const
{
return m_core->active();
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const
{
return m_core->pending();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear()
{
m_core->clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const
{
return m_core->empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t task_threshold = 0) const
{
m_core->wait(task_threshold);
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t task_threshold = 0) const
{
return m_core->wait(timestamp, task_threshold);
}
};
/*! \brief Fifo pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;
/*! \brief Lifo pool.
*
* The pool's tasks are lifo scheduled task_func functors.
*
*/
typedef thread_pool<task_func, lifo_scheduler, static_size, resize_controller, wait_for_all_tasks> lifo_pool;
/*! \brief Pool for prioritized task.
*
* The pool's tasks are prioritized prio_task_func functors.
*
*/
typedef thread_pool<prio_task_func, prio_scheduler, static_size, resize_controller, wait_for_all_tasks> prio_pool;
/*! \brief A standard pool.
*
* The pool's tasks are fifo scheduled task_func functors.
*
*/
typedef fifo_pool pool;
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_HPP_INCLUDED

View File

@ -1,70 +0,0 @@
/*! \file
* \brief Pool adaptors.
*
* This file contains an easy-to-use adaptor similar to a smart
* pointer for the pool class.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#define THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED
#include <boost/smart_ptr.hpp>
namespace boost { namespace threadpool
{
// TODO convenience scheduling function
/*! Schedules a Runnable for asynchronous execution. A Runnable is an arbitrary class with a run()
* member function. This a convenience shorthand for pool->schedule(bind(&Runnable::run, task_object)).
* \param
* \param obj The Runnable object. The member function run() will be exectued and should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
template<typename Pool, typename Runnable>
bool schedule(Pool& pool, shared_ptr<Runnable> const & obj)
{
return pool->schedule(bind(&Runnable::run, obj));
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object.
*/
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(Pool& pool, typename Pool::task_type const & task)
{
return pool.schedule(task);
}
template<typename Pool>
typename enable_if <
is_void< typename result_of< typename Pool::task_type() >::type >,
bool
>::type
schedule(shared_ptr<Pool> const pool, typename Pool::task_type const & task)
{
return pool->schedule(task);
}
} } // namespace boost::threadpool
#endif // THREADPOOL_POOL_ADAPTORS_HPP_INCLUDED

View File

@ -1,262 +0,0 @@
/*! \file
* \brief Task scheduling policies.
*
* This file contains some fundamental scheduling policies for the pool class.
* A scheduling policy is realized by a task container which controls the access to
* the tasks. Fundamentally the container determines the order the tasks are processed
* by the thread pool.
* The task containers need not to be thread-safe because they are used by the pool
* in thread-safe way.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED
#define THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED
#include <queue>
#include <deque>
#include "task_adaptors.hpp"
namespace boost { namespace threadpool
{
/*! \brief SchedulingPolicy which implements FIFO ordering.
*
* This container implements a FIFO scheduling policy.
* The first task to be added to the scheduler will be the first to be removed.
* The processing proceeds sequentially in the same order.
* FIFO stands for "first in, first out".
*
* \param Task A function object which implements the operator()(void).
*
*/
template <typename Task = task_func>
class fifo_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::deque<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push_back(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop_front();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.front();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
m_container.clear();
}
};
/*! \brief SchedulingPolicy which implements LIFO ordering.
*
* This container implements a LIFO scheduling policy.
* The last task to be added to the scheduler will be the first to be removed.
* LIFO stands for "last in, first out".
*
* \param Task A function object which implements the operator()(void).
*
*/
template <typename Task = task_func>
class lifo_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::deque<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push_front(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop_front();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.front();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
m_container.clear();
}
};
/*! \brief SchedulingPolicy which implements prioritized ordering.
*
* This container implements a scheduling policy based on task priorities.
* The task with highest priority will be the first to be removed.
* It must be possible to compare two tasks using operator<.
*
* \param Task A function object which implements the operator() and operator<. operator< must be a partial ordering.
*
* \see prio_thread_func
*
*/
template <typename Task = prio_task_func>
class prio_scheduler
{
public:
typedef Task task_type; //!< Indicates the scheduler's task type.
protected:
std::priority_queue<task_type> m_container; //!< Internal task container.
public:
/*! Adds a new task to the scheduler.
* \param task The task object.
* \return true, if the task could be scheduled and false otherwise.
*/
bool push(task_type const & task)
{
m_container.push(task);
return true;
}
/*! Removes the task which should be executed next.
*/
void pop()
{
m_container.pop();
}
/*! Gets the task which should be executed next.
* \return The task object to be executed.
*/
task_type const & top() const
{
return m_container.top();
}
/*! Gets the current number of tasks in the scheduler.
* \return The number of tasks.
* \remarks Prefer empty() to size() == 0 to check if the scheduler is empty.
*/
size_t size() const
{
return m_container.size();
}
/*! Checks if the scheduler is empty.
* \return true if the scheduler contains no tasks, false otherwise.
* \remarks Is more efficient than size() == 0.
*/
bool empty() const
{
return m_container.empty();
}
/*! Removes all tasks from the scheduler.
*/
void clear()
{
while(!m_container.empty())
{
m_container.pop();
}
}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SCHEDULING_POLICIES_HPP_INCLUDED

View File

@ -1,83 +0,0 @@
/*! \file
* \brief Shutdown policies.
*
* This file contains shutdown policies for thread_pool.
* A shutdown policy controls the pool's behavior from the time
* when the pool is not referenced any longer.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED
#define THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief ShutdownPolicy which waits for the completion of all tasks
* and the worker termination afterwards.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class wait_for_all_tasks
{
public:
static void shutdown(Pool& pool)
{
pool.wait();
pool.terminate_all_workers(true);
}
};
/*! \brief ShutdownPolicy which waits for the completion of all active tasks
* and the worker termination afterwards.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class wait_for_active_tasks
{
public:
static void shutdown(Pool& pool)
{
pool.clear();
pool.wait();
pool.terminate_all_workers(true);
}
};
/*! \brief ShutdownPolicy which does not wait for any tasks or worker termination.
*
* This policy does not wait for any tasks. Nevertheless all active tasks will be processed completely.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class immediately
{
public:
static void shutdown(Pool& pool)
{
pool.clear();
pool.terminate_all_workers(false);
}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SHUTDOWN_POLICIES_HPP_INCLUDED

View File

@ -1,99 +0,0 @@
/*! \file
* \brief Size policies.
*
* This file contains size policies for thread_pool. A size
* policy controls the number of worker threads in the pool.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_SIZE_POLICIES_HPP_INCLUDED
#define THREADPOOL_SIZE_POLICIES_HPP_INCLUDED
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{
/*! \brief SizePolicyController which provides no functionality.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
struct empty_controller
{
empty_controller(typename Pool::size_policy_type&, shared_ptr<Pool>) {}
};
/*! \brief SizePolicyController which allows resizing.
*
* \param Pool The pool's core type.
*/
template< typename Pool >
class resize_controller
{
typedef typename Pool::size_policy_type size_policy_type;
reference_wrapper<size_policy_type> m_policy;
shared_ptr<Pool> m_pool; //!< to make sure that the pool is alive (the policy pointer is valid) as long as the controller exists
public:
resize_controller(size_policy_type& policy, shared_ptr<Pool> pool)
: m_policy(policy)
, m_pool(pool)
{
}
bool resize(size_t worker_count)
{
return m_policy.get().resize(worker_count);
}
};
/*! \brief SizePolicy which preserves the thread count.
*
* \param Pool The pool's core type.
*/
template<typename Pool>
class static_size
{
reference_wrapper<Pool volatile> m_pool;
public:
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
static_size(Pool volatile & pool)
: m_pool(pool)
{}
bool resize(size_t const worker_count)
{
return m_pool.get().resize(worker_count);
}
void worker_died_unexpectedly(size_t const new_worker_count)
{
m_pool.get().resize(new_worker_count + 1);
}
// TODO this functions are not called yet
void task_scheduled() {}
void task_finished() {}
};
} } // namespace boost::threadpool
#endif // THREADPOOL_SIZE_POLICIES_HPP_INCLUDED

View File

@ -1,176 +0,0 @@
/*! \file
* \brief Task adaptors.
*
* This file contains adaptors for task function objects.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED
#define THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED
#include <boost/version.hpp>
#if BOOST_VERSION >= 105000
#ifndef TIME_UTC
#define TIME_UTC TIME_UTC_
#endif
#endif
#include <boost/smart_ptr.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
namespace boost { namespace threadpool
{
/*! \brief Standard task function object.
*
* This function object wraps a nullary function which returns void.
* The wrapped function is invoked by calling the operator ().
*
* \see boost function library
*
*/
typedef function0<void> task_func;
/*! \brief Prioritized task function object.
*
* This function object wraps a task_func object and binds a priority to it.
* prio_task_funcs can be compared using the operator < which realises a partial ordering.
* The wrapped task function is invoked by calling the operator ().
*
* \see prio_scheduler
*
*/
class prio_task_func
{
private:
unsigned int m_priority; //!< The priority of the task's function.
task_func m_function; //!< The task's function.
public:
typedef void result_type; //!< Indicates the functor's result type.
public:
/*! Constructor.
* \param priority The priority of the task.
* \param function The task's function object.
*/
prio_task_func(unsigned int const priority, task_func const & function)
: m_priority(priority)
, m_function(function)
{
}
/*! Executes the task function.
*/
void operator() (void) const
{
if(m_function)
{
m_function();
}
}
/*! Comparison operator which realises a partial ordering based on priorities.
* \param rhs The object to compare with.
* \return true if the priority of *this is less than right hand side's priority, false otherwise.
*/
bool operator< (const prio_task_func& rhs) const
{
return m_priority < rhs.m_priority;
}
}; // prio_task_func
/*! \brief Looped task function object.
*
* This function object wraps a boolean thread function object.
* The wrapped task function is invoked by calling the operator () and it is executed in regular
* time intervals until false is returned. The interval length may be zero.
* Please note that a pool's thread is engaged as long as the task is looped.
*
*/
class looped_task_func
{
private:
function0<bool> m_function; //!< The task's function.
unsigned int m_break_s; //!< Duration of breaks in seconds.
unsigned int m_break_ns; //!< Duration of breaks in nano seconds.
public:
typedef void result_type; //!< Indicates the functor's result type.
public:
/*! Constructor.
* \param function The task's function object which is looped until false is returned.
* \param interval The minimum break time in milli seconds before the first execution of the task function and between the following ones.
*/
looped_task_func(function0<bool> const & function, unsigned int const interval = 0)
: m_function(function)
{
m_break_s = interval / 1000;
m_break_ns = (interval - m_break_s * 1000) * 1000 * 1000;
}
/*! Executes the task function.
*/
void operator() (void) const
{
if(m_function)
{
if(m_break_s > 0 || m_break_ns > 0)
{ // Sleep some time before first execution
xtime xt;
xtime_get(&xt, TIME_UTC);
xt.nsec += m_break_ns;
xt.sec += m_break_s;
thread::sleep(xt);
}
while(m_function())
{
if(m_break_s > 0 || m_break_ns > 0)
{
xtime xt;
xtime_get(&xt, TIME_UTC);
xt.nsec += m_break_ns;
xt.sec += m_break_s;
thread::sleep(xt);
}
else
{
thread::yield(); // Be fair to other threads
}
}
}
}
}; // looped_task_func
} } // namespace boost::threadpool
#endif // THREADPOOL_TASK_ADAPTERS_HPP_INCLUDED

View File

@ -12,7 +12,7 @@ endif()
if (${DISABLE_MONGODB})
add_definitions(-D DISABLE_MONGODB)
else()
set (LINK_MONGOCLIENT libmongoclient.a libssl.a libcrypto.a)
set (LINK_MONGOCLIENT libmongoclient.a libssl.a libcrypto.a libboost_thread.a)
endif()
add_library (dbms
@ -430,6 +430,7 @@ add_library (dbms
include/DB/Common/StringUtils.h
include/DB/Common/randomSeed.h
include/DB/Common/unaligned.h
include/DB/Common/ThreadPool.h
include/DB/IO/CompressedStream.h
include/DB/IO/ReadBufferFromFileDescriptor.h
include/DB/IO/CompressedWriteBuffer.h
@ -958,7 +959,6 @@ target_link_libraries(dbms
${LINK_LIBRARIES_ONLY_ON_X86_64}
re2 re2_st
libcrypto.a
libboost_thread.a
libboost_system.a
${LINK_MONGOCLIENT}
libboost_regex.a

View File

@ -0,0 +1,137 @@
#pragma once
#include <cstdint>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
*/
class ThreadPool
{
private:
using Job = std::function<void()>;
public:
ThreadPool(size_t m_size)
: m_size(m_size)
{
threads.reserve(m_size);
for (size_t i = 0; i < m_size; ++i)
threads.emplace_back([this] { worker(); });
}
void schedule(Job job)
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs < m_size; });
if (shutdown)
return;
jobs.push(std::move(job));
++active_jobs;
}
has_new_job_or_shutdown.notify_one();
}
void wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs == 0; });
if (!exceptions.empty())
std::rethrow_exception(exceptions.front());
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
has_new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
}
size_t size() const { return m_size; }
size_t active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
private:
mutable std::mutex mutex;
std::condition_variable has_free_thread;
std::condition_variable has_new_job_or_shutdown;
const size_t m_size;
size_t active_jobs = 0;
bool shutdown = false;
std::queue<Job> jobs;
std::vector<std::thread> threads;
std::vector<std::exception_ptr> exceptions; /// NOTE Saving many exceptions but rethrow just first one.
void worker()
{
while (true)
{
Job job;
{
std::unique_lock<std::mutex> lock(mutex);
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
if (!shutdown)
{
job = std::move(jobs.front());
jobs.pop();
}
}
if (!job)
return; /// shutdown
try
{
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
exceptions.push_back(std::current_exception());
shutdown = true;
--active_jobs;
}
has_free_thread.notify_one();
has_new_job_or_shutdown.notify_all();
return;
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
has_free_thread.notify_one();
}
}
};

View File

@ -1,12 +1,11 @@
#pragma once
#include <threadpool.hpp>
#include <Poco/Event.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Common/ThreadPool.h>
namespace DB
@ -81,7 +80,7 @@ public:
}
protected:
boost::threadpool::pool pool{1};
ThreadPool pool{1};
Poco::Event ready;
bool started = false;
bool first = true;

View File

@ -1,9 +1,9 @@
#pragma once
#include <threadpool.hpp>
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/Common/ThreadPool.h>
#include <condition_variable>
@ -118,13 +118,13 @@ private:
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
BlocksToMerge getNextBlocksToMerge();
std::unique_ptr<boost::threadpool::pool> reading_pool;
std::unique_ptr<ThreadPool> reading_pool;
/// Для параллельного мерджа.
struct ParallelMergeData
{
boost::threadpool::pool pool;
ThreadPool pool;
/// Сейчас один из мерджащих потоков получает следующие блоки для мерджа. Эта операция должна делаться последовательно.
std::mutex get_next_blocks_mutex;
bool exhausted = false; /// Данных больше нет.

View File

@ -83,7 +83,7 @@ public:
String getEngineName() const override { return "Cloud"; }
void loadTables(Context & context, boost::threadpool::pool * thread_pool) override;
void loadTables(Context & context, ThreadPool * thread_pool) override;
bool isTableExist(const String & table_name) const override;
StoragePtr tryGetTable(const String & table_name) override;

View File

@ -1,6 +1,6 @@
#pragma once
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Databases/IDatabase.h>

View File

@ -25,7 +25,7 @@ public:
String getEngineName() const override { return "Ordinary"; }
void loadTables(Context & context, boost::threadpool::pool * thread_pool) override;
void loadTables(Context & context, ThreadPool * thread_pool) override;
bool isTableExist(const String & table_name) const override;
StoragePtr tryGetTable(const String & table_name) override;

View File

@ -1,6 +1,6 @@
#pragma once
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Core/Types.h>
#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
@ -43,7 +43,7 @@ public:
/// Загрузить множество существующих таблиц. Если задан thread_pool - использовать его.
/// Можно вызывать только один раз, сразу после создания объекта.
virtual void loadTables(Context & context, boost::threadpool::pool * thread_pool) = 0;
virtual void loadTables(Context & context, ThreadPool * thread_pool) = 0;
/// Проверить существование таблицы.
virtual bool isTableExist(const String & name) const = 0;

View File

@ -4,10 +4,7 @@
#include <vector>
#include <Poco/Net/NetException.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/IO/WriteBuffer.h>
@ -22,7 +19,7 @@ class AsynchronousWriteBuffer : public WriteBuffer
private:
WriteBuffer & out; /// Основной буфер, отвечает за запись данных.
std::vector<char> memory; /// Кусок памяти для дублирования буфера.
boost::threadpool::pool pool; /// Для асинхронной записи данных.
ThreadPool pool; /// Для асинхронной записи данных.
bool started; /// Была ли запущена асинхронная запись данных.
/// Менять местами основной и дублирующий буферы.
@ -42,9 +39,6 @@ private:
else
started = true;
if (exception)
std::rethrow_exception(exception);
swapBuffers();
/// Данные будут записываться в отельном потоке.
@ -74,19 +68,10 @@ public:
}
}
std::exception_ptr exception;
/// То, что выполняется в отдельном потоке
void thread()
{
try
{
out.next();
}
catch (...)
{
exception = std::current_exception();
}
out.next();
}
};

View File

@ -7,12 +7,12 @@
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <threadpool.hpp>
#include <DB/Core/StringRef.h>
#include <DB/Common/Arena.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Common/HashTable/TwoLevelHashMap.h>
#include <DB/Common/ThreadPool.h>
#include <DB/DataStreams/IBlockInputStream.h>
@ -27,7 +27,6 @@
#include <DB/Columns/ColumnVector.h>
namespace DB
{
@ -1136,14 +1135,14 @@ protected:
BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
BlocksList prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, boost::threadpool::pool * thread_pool) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <typename Method>
BlocksList prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants,
Method & method,
bool final,
boost::threadpool::pool * thread_pool) const;
ThreadPool * thread_pool) const;
template <bool no_more_keys, typename Method, typename Table>
void mergeStreamsImplCase(

View File

@ -9,11 +9,11 @@
#include <unordered_map>
#include <common/logger_useful.h>
#include <threadpool.hpp>
#include <DB/Core/Types.h>
#include <DB/Common/Exception.h>
#include <DB/Common/UInt128.h>
#include <DB/Common/ThreadPool.h>
namespace DB
@ -99,7 +99,7 @@ private:
using Files = std::unordered_set<std::string>;
const std::string path;
boost::threadpool::pool pool;
ThreadPool pool;
/// Количество вызовов функции getOrCount.
Counts counts;

View File

@ -1,11 +1,10 @@
#pragma once
#include <threadpool.hpp>
#include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Storages/ColumnDefault.h>
#include <DB/Common/ThreadPool.h>
namespace DB
@ -31,7 +30,7 @@ public:
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults);
void setDatabaseLoadingThreadpool(boost::threadpool::pool & thread_pool_)
void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_)
{
thread_pool = &thread_pool_;
}
@ -59,7 +58,7 @@ private:
Context context;
/// Используется при загрузке базы данных.
boost::threadpool::pool * thread_pool = nullptr;
ThreadPool * thread_pool = nullptr;
};

View File

@ -13,7 +13,7 @@
#include <Poco/Util/Application.h>
#include <DB/Common/Stopwatch.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/AggregateFunctions/ReservoirSampler.h>
#include <boost/program_options.hpp>
@ -138,7 +138,7 @@ private:
std::mutex mutex;
boost::threadpool::pool pool;
ThreadPool pool;
void readQueries()

View File

@ -44,4 +44,7 @@ add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists dbms)
add_executable (pod_array pod_array.cpp)
target_link_libraries (pod_array dbms)
target_link_libraries (pod_array dbms)
add_executable (thread_creation_latency thread_creation_latency.cpp)
target_link_libraries (thread_creation_latency dbms)

View File

@ -16,7 +16,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Common/Stopwatch.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
using Key = UInt64;
@ -253,7 +253,7 @@ int main(int argc, char ** argv)
std::cerr << std::fixed << std::setprecision(2);
boost::threadpool::pool pool(num_threads);
ThreadPool pool(num_threads);
Source data(n);

View File

@ -16,7 +16,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/Common/Stopwatch.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
using Key = UInt64;
@ -30,7 +30,7 @@ struct AggregateIndependent
template <typename Creator, typename Updater>
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
Creator && creator, Updater && updater,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
results.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
@ -73,7 +73,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
template <typename Creator, typename Updater>
static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
Creator && creator, Updater && updater,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
results.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
@ -124,7 +124,7 @@ struct MergeSequential
template <typename Merger>
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
Merger && merger,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
for (size_t i = 1; i < num_maps; ++i)
{
@ -144,7 +144,7 @@ struct MergeSequentialTransposed /// На практике не лучше об
template <typename Merger>
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
Merger && merger,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
std::vector<typename Map::iterator> iterators(num_maps);
for (size_t i = 1; i < num_maps; ++i)
@ -177,7 +177,7 @@ struct MergeParallelForTwoLevelTable
template <typename Merger>
static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
Merger && merger,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
pool.schedule([&, bucket, num_maps]
@ -202,7 +202,7 @@ struct Work
template <typename Creator, typename Updater, typename Merger>
static void NO_INLINE execute(const Source & data, size_t num_threads,
Creator && creator, Updater && updater, Merger && merger,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
std::vector<std::unique_ptr<Map>> intermediate_results;
@ -292,7 +292,7 @@ int main(int argc, char ** argv)
std::cerr << std::fixed << std::setprecision(2);
boost::threadpool::pool pool(num_threads);
ThreadPool pool(num_threads);
Source data(n);

View File

@ -0,0 +1,137 @@
#include <iostream>
#include <iomanip>
#include <DB/IO/ReadHelpers.h>
//#include <DB/Common/ThreadPool.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Common/Exception.h>
#include <DB/Common/ThreadPool.h>
int x = 0;
void f() { ++x; }
/*void f()
{
std::vector<std::string> vec;
for (size_t i = 0; i < 100; ++i)
vec.push_back(std::string(rand() % 10, ' '));
}*/
void * g(void *) { f(); return {}; }
template <typename F>
void test(size_t n, const char * name, F && kernel)
{
x = 0;
Stopwatch watch;
Stopwatch watch_one;
double max_seconds = 0;
std::cerr << name << ":\n";
for (size_t i = 0; i < n; ++i)
{
watch_one.restart();
kernel();
watch_one.stop();
if (watch_one.elapsedSeconds() > max_seconds)
max_seconds = watch_one.elapsedSeconds();
}
watch.stop();
std::cerr
<< std::fixed << std::setprecision(2)
<< n << " ops in "
<< watch.elapsedSeconds() << " sec., "
<< n / watch.elapsedSeconds() << " ops/sec., "
<< "avg latency: " << watch.elapsedSeconds() / n * 1000000 << " μs, "
<< "max latency: " << max_seconds * 1000000 << " μs "
<< "(res = " << x << ")"
<< std::endl;
}
int main(int argc, char ** argv)
{
size_t n = argc == 2 ? DB::parse<UInt64>(argv[1]) : 100000;
/* test(n, "Create and destroy boost::threadpool each iteration", []
{
boost::threadpool::pool tp(1);
tp.schedule(f);
tp.wait();
});*/
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
tp.schedule(f);
tp.wait();
});
test(n, "pthread_create, pthread_join each iteration", []
{
pthread_t thread;
if (pthread_create(&thread, nullptr, g, nullptr))
DB::throwFromErrno("Cannot create thread.");
if (pthread_join(thread, nullptr))
DB::throwFromErrno("Cannot join thread.");
});
test(n, "Create and destroy std::thread each iteration", []
{
std::thread thread(f);
thread.join();
});
/* {
boost::threadpool::pool tp(1);
test(n, "Schedule job for boost::threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.wait();
});
}*/
{
ThreadPool tp(1);
test(n, "Schedule job for Threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.wait();
});
}
/* {
boost::threadpool::pool tp(128);
test(n, "Schedule job for boost::threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.wait();
});
}*/
{
ThreadPool tp(128);
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.wait();
});
}
return 0;
}

View File

@ -98,29 +98,24 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
}
else
{
reading_pool.reset(new boost::threadpool::pool(reading_threads));
reading_pool = std::make_unique<ThreadPool>(reading_threads);
size_t num_children = children.size();
std::vector<std::packaged_task<void()>> tasks(num_children);
for (size_t i = 0; i < num_children; ++i)
{
auto & child = children[i];
auto & task = tasks[i];
auto memory_tracker = current_memory_tracker;
task = std::packaged_task<void()>([&child, memory_tracker]
reading_pool->schedule([&child, memory_tracker]
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
reading_pool->schedule([&task] { task(); });
}
reading_pool->wait();
for (auto & task : tasks)
task.get_future().get();
}
if (merging_threads > 1)
@ -387,30 +382,22 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
}
else
{
size_t num_inputs = inputs.size();
std::vector<std::packaged_task<void()>> tasks;
tasks.reserve(num_inputs);
for (auto & input : inputs)
{
if (need_that_input(input))
{
auto memory_tracker = current_memory_tracker;
tasks.emplace_back([&input, &read_from_input, memory_tracker]
reading_pool->schedule([&input, &read_from_input, memory_tracker]
{
current_memory_tracker = memory_tracker;
setThreadName("MergeAggReadThr");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
auto & task = tasks.back();
reading_pool->schedule([&task] { task(); });
}
}
reading_pool->wait();
for (auto & task : tasks)
task.get_future().get();
}
while (true)

View File

@ -1,7 +1,7 @@
#include <iostream>
#include <iomanip>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
@ -79,7 +79,7 @@ try
std::mutex mutex;
boost::threadpool::pool pool(inputs.size() + forks.size());
ThreadPool pool(inputs.size() + forks.size());
pool.schedule(std::bind(inputThread, inputs[0], out1, std::ref(wb), std::ref(mutex)));
pool.schedule(std::bind(inputThread, inputs[1], out2, std::ref(wb), std::ref(mutex)));

View File

@ -94,7 +94,7 @@ DatabaseCloud::DatabaseCloud(
}
void loadTables(Context & context, boost::threadpool::pool * thread_pool)
void loadTables(Context & context, ThreadPool * thread_pool)
{
/// Ничего не делаем - все таблицы загружаются лениво.
}

View File

@ -89,7 +89,7 @@ DatabaseOrdinary::DatabaseOrdinary(
}
void DatabaseOrdinary::loadTables(Context & context, boost::threadpool::pool * thread_pool)
void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool)
{
log = &Logger::get("DatabaseOrdinary (" + name + ")");
@ -161,7 +161,6 @@ void DatabaseOrdinary::loadTables(Context & context, boost::threadpool::pool * t
const size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
std::vector<std::packaged_task<void()>> tasks(num_bunches);
for (size_t i = 0; i < num_bunches; ++i)
{
@ -170,19 +169,16 @@ void DatabaseOrdinary::loadTables(Context & context, boost::threadpool::pool * t
? file_names.end()
: (file_names.begin() + (i + 1) * bunch_size);
tasks[i] = std::packaged_task<void()>(std::bind(task_function, begin, end));
auto task = std::bind(task_function, begin, end);
if (thread_pool)
thread_pool->schedule([i, &tasks]{ tasks[i](); });
thread_pool->schedule(task);
else
tasks[i]();
task();
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
task.get_future().get();
}

View File

@ -1176,7 +1176,7 @@ BlocksList Aggregator::prepareBlocksAndFillSingleLevel(AggregatedDataVariants &
}
BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, boost::threadpool::pool * thread_pool) const
BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const
{
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
@ -1195,7 +1195,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants,
Method & method,
bool final,
boost::threadpool::pool * thread_pool) const
ThreadPool * thread_pool) const
{
auto converter = [&](size_t bucket, MemoryTracker * memory_tracker)
{
@ -1263,10 +1263,10 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (data_variants.empty())
return blocks;
std::unique_ptr<boost::threadpool::pool> thread_pool;
std::unique_ptr<ThreadPool> thread_pool;
if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Сделать настраиваемый порог.
&& data_variants.isTwoLevel()) /// TODO Использовать общий тред-пул с функцией merge.
thread_pool.reset(new boost::threadpool::pool(max_threads));
thread_pool.reset(new ThreadPool(max_threads));
if (isCancelled())
return BlocksList();
@ -1602,7 +1602,7 @@ private:
struct ParallelMergeData
{
boost::threadpool::pool pool;
ThreadPool pool;
std::map<Int32, Block> ready_blocks;
std::exception_ptr exception;
std::mutex mutex;
@ -1971,14 +1971,10 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
}
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
std::vector<std::packaged_task<void()>> tasks(max_bucket + 1);
std::unique_ptr<boost::threadpool::pool> thread_pool;
std::unique_ptr<ThreadPool> thread_pool;
if (max_threads > 1 && total_input_rows > 100000 /// TODO Сделать настраиваемый порог.
&& has_two_level)
thread_pool.reset(new boost::threadpool::pool(max_threads));
thread_pool.reset(new ThreadPool(max_threads));
for (const auto & bucket_blocks : bucket_to_blocks)
{
@ -1990,21 +1986,17 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
result.aggregates_pools.push_back(std::make_shared<Arena>());
Arena * aggregates_pool = result.aggregates_pools.back().get();
tasks[bucket] = std::packaged_task<void()>(std::bind(merge_bucket, bucket, aggregates_pool, current_memory_tracker));
auto task = std::bind(merge_bucket, bucket, aggregates_pool, current_memory_tracker);
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
thread_pool->schedule(task);
else
tasks[bucket]();
task();
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
if (task.valid())
task.get_future().get();
LOG_TRACE(log, "Merged partially aggregated two-level data.");
}

View File

@ -2,7 +2,7 @@
#include <thread>
#include <future>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/FileStream.h>
@ -30,7 +30,7 @@ static void executeCreateQuery(
Context & context,
const String & database,
const String & file_name,
boost::threadpool::pool & pool)
ThreadPool & pool)
{
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "in file " + file_name);
@ -50,7 +50,7 @@ void loadMetadata(Context & context)
String path = context.getPath() + "metadata";
/// Используется для параллельной загрузки таблиц.
boost::threadpool::pool thread_pool(SettingMaxThreads().getAutoValue());
ThreadPool thread_pool(SettingMaxThreads().getAutoValue());
/// Цикл по базам данных
Poco::DirectoryIterator dir_end;

View File

@ -22,7 +22,7 @@
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/Cluster.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <zkutil/ZooKeeper.h>
@ -954,7 +954,7 @@ void ReshardingWorker::publishShardedPartitions()
size_t remote_count = task_info_list.size() - local_count;
boost::threadpool::pool pool(remote_count);
ThreadPool pool(remote_count);
using Tasks = std::vector<std::packaged_task<bool()> >;
Tasks tasks(remote_count);
@ -1109,7 +1109,7 @@ void ReshardingWorker::commit()
/// Execute all the remaining log records.
size_t pool_size = operation_count;
boost::threadpool::pool pool(pool_size);
ThreadPool pool(pool_size);
using Tasks = std::vector<std::packaged_task<void()> >;
Tasks tasks(pool_size);
@ -1294,7 +1294,7 @@ bool ReshardingWorker::checkAttachLogRecord(LogRecord & log_record)
}
}
boost::threadpool::pool pool(task_info_list.size());
ThreadPool pool(task_info_list.size());
using Tasks = std::vector<std::packaged_task<RemotePartChecker::Status()> >;
Tasks tasks(task_info_list.size());

View File

@ -36,7 +36,7 @@
#include <Poco/DirectoryIterator.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <ext/range.hpp>
#include <cfenv>
@ -3602,7 +3602,7 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
}
}
boost::threadpool::pool pool(task_info_list.size());
ThreadPool pool(task_info_list.size());
using Tasks = std::vector<std::packaged_task<size_t()> >;
Tasks tasks(task_info_list.size());

View File

@ -131,22 +131,9 @@ BlockInputStreams StorageSystemDictionaries::read(
{
std::rethrow_exception(dict_info.second.exception);
}
catch (const Exception & e)
{
col_last_exception.column->insert("DB::Exception. Code " + toString(e.code()) + ". " +
std::string{e.displayText()});
}
catch (const Poco::Exception & e)
{
col_last_exception.column->insert("Poco::Exception. " + std::string{e.displayText()});
}
catch (const std::exception & e)
{
col_last_exception.column->insert("std::exception. " + std::string{e.what()});
}
catch (...)
{
col_last_exception.column->insert(std::string{"<unknown exception type>"});
col_last_exception.column->insert(getCurrentExceptionMessage(false));
}
}
else

View File

@ -11,7 +11,7 @@ target_link_libraries (date_lut_init common libicui18n.a libicuuc.a libicudata.a
target_link_libraries (date_lut2 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (date_lut3 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (date_lut4 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (multi_version common libboost_thread.a libboost_system.a rt)
target_link_libraries (json_test dbms libboost_thread.a libboost_system.a rt)
target_link_libraries (multi_version dbms libboost_system.a rt)
target_link_libraries (json_test dbms libboost_system.a rt)
add_check (json_test)

View File

@ -1,6 +1,6 @@
#include <string.h>
#include <iostream>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <functional>
#include <common/MultiVersion.h>
#include <Poco/Exception.h>
@ -34,7 +34,7 @@ int main(int argc, char ** argv)
MV x(std::make_shared<T>(s1));
Results results(n);
boost::threadpool::pool tp(8);
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));

View File

@ -13,7 +13,7 @@
#include <DB/Common/Exception.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Common/Stopwatch.h>
#include <stdlib.h>
@ -54,56 +54,49 @@ struct AlignedBuffer
}
};
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t count, std::exception_ptr & exception)
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t count)
{
try
{
AlignedBuffer direct_buf(block_size);
std::vector<char> simple_buf(block_size);
AlignedBuffer direct_buf(block_size);
std::vector<char> simple_buf(block_size);
char * buf;
if ((mode & MODE_DIRECT))
buf = direct_buf.data;
char * buf;
if ((mode & MODE_DIRECT))
buf = direct_buf.data;
else
buf = &simple_buf[0];
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
for (size_t i = 0; i < count; ++i)
{
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset;
if ((mode & MODE_DIRECT) || (mode & MODE_ALIGNED))
offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
else
buf = &simple_buf[0];
offset = min_offset + rand_result % (max_offset - min_offset - block_size + 1);
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
for (size_t i = 0; i < count; ++i)
if (mode & MODE_READ)
{
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset;
if ((mode & MODE_DIRECT) || (mode & MODE_ALIGNED))
offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
else
offset = min_offset + rand_result % (max_offset - min_offset - block_size + 1);
if (mode & MODE_READ)
{
if (static_cast<int>(block_size) != pread(fd, buf, block_size, offset))
throwFromErrno("Cannot read");
}
else
{
if (static_cast<int>(block_size) != pwrite(fd, buf, block_size, offset))
throwFromErrno("Cannot write");
}
if (static_cast<int>(block_size) != pread(fd, buf, block_size, offset))
throwFromErrno("Cannot read");
}
else
{
if (static_cast<int>(block_size) != pwrite(fd, buf, block_size, offset))
throwFromErrno("Cannot write");
}
}
catch (...)
{
exception = std::current_exception();
}
}
@ -157,27 +150,20 @@ int mainImpl(int argc, char ** argv)
}
}
boost::threadpool::pool pool(threads);
ThreadPool pool(threads);
int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_DIRECT) ? O_DIRECT : 0) | ((mode & MODE_SYNC) ? O_SYNC : 0));
if (-1 == fd)
throwFromErrno("Cannot open file");
using Exceptions = std::vector<std::exception_ptr>;
Exceptions exceptions(threads);
Stopwatch watch;
for (size_t i = 0; i < threads; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count, std::ref(exceptions[i])));
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count));
pool.wait();
fsync(fd);
for (size_t i = 0; i < threads; ++i)
if (exceptions[i])
std::rethrow_exception(exceptions[i]);
watch.stop();
if (0 != close(fd))

View File

@ -13,7 +13,7 @@
#include <DB/Common/Exception.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Common/Stopwatch.h>
#include <stdlib.h>
@ -111,102 +111,95 @@ struct AioContext
};
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count, std::exception_ptr & exception)
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count)
{
try
{
AioContext ctx;
AioContext ctx;
std::vector<AlignedBuffer> buffers(buffers_count);
std::vector<AlignedBuffer> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
{
buffers[i].init(block_size);
}
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
size_t in_progress = 0;
size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count);
while (blocks_sent < count || in_progress > 0)
{
/// Составим запросы.
query_cbs.clear();
for (size_t i = 0; i < buffers_count; ++i)
{
buffers[i].init(block_size);
if (blocks_sent >= count || in_progress >= buffers_count)
break;
if (buffer_used[i])
continue;
buffer_used[i] = true;
++blocks_sent;
++in_progress;
char * buf = buffers[i].data;
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<uint64_t>(buf);
cb.aio_fildes = fd;
cb.aio_nbytes = block_size;
cb.aio_offset = offset;
cb.aio_data = static_cast<uint64_t>(i);
if (mode == MODE_READ)
{
cb.aio_lio_opcode = IOCB_CMD_PREAD;
}
else
{
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
}
query_cbs.push_back(&cb);
}
drand48_data rand_data;
timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data);
/// Отправим запросы.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed");
size_t in_progress = 0;
size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count);
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
if (evs < 0)
throwFromErrno("io_getevents failed");
while (blocks_sent < count || in_progress > 0)
for (int i = 0; i < evs; ++i)
{
/// Составим запросы.
query_cbs.clear();
for (size_t i = 0; i < buffers_count; ++i)
{
if (blocks_sent >= count || in_progress >= buffers_count)
break;
if (buffer_used[i])
continue;
buffer_used[i] = true;
++blocks_sent;
++in_progress;
char * buf = buffers[i].data;
long rand_result1 = 0;
long rand_result2 = 0;
long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<uint64_t>(buf);
cb.aio_fildes = fd;
cb.aio_nbytes = block_size;
cb.aio_offset = offset;
cb.aio_data = static_cast<uint64_t>(i);
if (mode == MODE_READ)
{
cb.aio_lio_opcode = IOCB_CMD_PREAD;
}
else
{
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
}
query_cbs.push_back(&cb);
}
/// Отправим запросы.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed");
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
if (evs < 0)
throwFromErrno("io_getevents failed");
for (int i = 0; i < evs; ++i)
{
int b = static_cast<int>(events[i].data);
if (events[i].res != static_cast<int>(block_size))
throw Poco::Exception("read/write error");
--in_progress;
buffer_used[b] = false;
}
int b = static_cast<int>(events[i].data);
if (events[i].res != static_cast<int>(block_size))
throw Poco::Exception("read/write error");
--in_progress;
buffer_used[b] = false;
}
}
catch (...)
{
exception = std::current_exception();
}
}
@ -243,21 +236,16 @@ int mainImpl(int argc, char ** argv)
using Exceptions = std::vector<std::exception_ptr>;
boost::threadpool::pool pool(threads_count);
Exceptions exceptions(threads_count);
ThreadPool pool(threads_count);
Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, std::ref(exceptions[i])));
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
pool.wait();
watch.stop();
for (size_t i = 0; i < threads_count; ++i)
if (exceptions[i])
std::rethrow_exception(exceptions[i]);
if (0 != close(fd))
throwFromErrno("Cannot close file");

View File

@ -16,7 +16,7 @@
#include <DB/Common/Exception.h>
#include <threadpool.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Common/Stopwatch.h>