2019-01-14 10:59:58 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <cstdint>
|
|
|
|
#include <thread>
|
|
|
|
#include <mutex>
|
|
|
|
#include <condition_variable>
|
|
|
|
#include <functional>
|
|
|
|
#include <queue>
|
2019-01-14 19:22:09 +00:00
|
|
|
#include <list>
|
|
|
|
#include <optional>
|
2019-01-14 10:59:58 +00:00
|
|
|
|
2019-03-06 17:54:20 +00:00
|
|
|
#include <Poco/Event.h>
|
2019-01-14 19:22:09 +00:00
|
|
|
#include <Common/ThreadStatus.h>
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
|
|
|
|
/** Very simple thread pool similar to boost::threadpool.
|
|
|
|
* Advantages:
|
|
|
|
* - catches exceptions and rethrows on wait.
|
2019-01-14 19:22:09 +00:00
|
|
|
*
|
|
|
|
* This thread pool can be used as a task queue.
|
|
|
|
* For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks
|
|
|
|
* - in this case you will be blocked to keep 10 tasks in fly.
|
|
|
|
*
|
|
|
|
* Thread: std::thread or something with identical interface.
|
2019-01-14 10:59:58 +00:00
|
|
|
*/
|
|
|
|
template <typename Thread>
|
|
|
|
class ThreadPoolImpl
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
using Job = std::function<void()>;
|
|
|
|
|
2020-07-16 23:12:47 +00:00
|
|
|
/// Maximum number of threads is based on the number of physical cores.
|
|
|
|
ThreadPoolImpl();
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
|
2019-08-03 11:02:40 +00:00
|
|
|
explicit ThreadPoolImpl(size_t max_threads_);
|
2019-01-14 10:59:58 +00:00
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
|
Do not shutdown global thread pool on exception
Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.
Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):
2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):
1. Common/Exception.cpp:35: DB::Exception::Exception(...)
...
6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)
Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
|
|
|
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_, bool shutdown_on_exception_ = true);
|
2019-01-14 10:59:58 +00:00
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
|
2019-10-17 14:41:27 +00:00
|
|
|
/// If any thread was throw an exception, first exception will be rethrown from this method,
|
|
|
|
/// and exception will be cleared.
|
|
|
|
/// Also throws an exception if cannot create thread.
|
2019-01-14 10:59:58 +00:00
|
|
|
/// Priority: greater is higher.
|
2019-10-17 14:41:27 +00:00
|
|
|
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
|
|
|
|
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
|
|
|
|
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
|
|
|
|
void scheduleOrThrowOnError(Job job, int priority = 0);
|
2019-01-14 10:59:58 +00:00
|
|
|
|
2019-10-17 14:41:27 +00:00
|
|
|
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
|
|
|
|
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
|
2019-01-14 19:22:09 +00:00
|
|
|
|
2019-10-17 14:41:27 +00:00
|
|
|
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
|
2019-01-14 19:22:09 +00:00
|
|
|
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
/// Wait for all currently active jobs to be done.
|
2019-10-17 14:41:27 +00:00
|
|
|
/// You may call schedule and wait many times in arbitrary order.
|
2019-01-14 10:59:58 +00:00
|
|
|
/// If any thread was throw an exception, first exception will be rethrown from this method,
|
|
|
|
/// and exception will be cleared.
|
|
|
|
void wait();
|
|
|
|
|
|
|
|
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
|
|
|
|
/// You should not destroy object while calling schedule or wait methods from another threads.
|
|
|
|
~ThreadPoolImpl();
|
|
|
|
|
|
|
|
/// Returns number of running and scheduled jobs.
|
|
|
|
size_t active() const;
|
|
|
|
|
2019-08-02 17:14:04 +00:00
|
|
|
void setMaxThreads(size_t value);
|
|
|
|
void setMaxFreeThreads(size_t value);
|
|
|
|
void setQueueSize(size_t value);
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
private:
|
|
|
|
mutable std::mutex mutex;
|
|
|
|
std::condition_variable job_finished;
|
|
|
|
std::condition_variable new_job_or_shutdown;
|
|
|
|
|
2019-08-02 17:14:04 +00:00
|
|
|
size_t max_threads;
|
|
|
|
size_t max_free_threads;
|
|
|
|
size_t queue_size;
|
2019-01-14 10:59:58 +00:00
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
size_t scheduled_jobs = 0;
|
2019-01-14 10:59:58 +00:00
|
|
|
bool shutdown = false;
|
Do not shutdown global thread pool on exception
Otherwise GlobalThreadPool can be terminated (for example due to an
exception from the ParallelInputsHandler::onFinish/onFinishThread, from
ParallelAggregatingBlockInputStream::Handler::onFinish/onFinishThread,
since writeToTemporaryFile() can definitelly throw) and the server will
not accept new connections (or/and execute queries) anymore.
Here is possible stacktrace (it is a bit inaccurate, due to
optimizations I guess, and it had been obtained with the
DB::tryLogCurrentException() in the catch block of the
ThreadPoolImpl::worker()):
2020.02.16 22:30:40.415246 [ 45909 ] {} <Error> ThreadPool: Unhandled exception in the ThreadPool(10000,1000,10000) the loop will be shutted down: Code: 241, e.displayText() = DB::Exception: Memory limit (total) exceeded: would use 279.40 GiB (attempt to allocate chunk of 4205536 bytes), maximum: 279.40 GiB, Stack trace (when copying this message, always include the lines below):
1. Common/Exception.cpp:35: DB::Exception::Exception(...)
...
6. Common/Allocator.h:102: void DB::PODArrayBase<8ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::reserve<>(unsigned long) (.part.0)
7. Interpreters/Aggregator.cpp:1040: void DB::Aggregator::writeToTemporaryFileImpl<...>(...)
8. Interpreters/Aggregator.cpp:719: DB::Aggregator::writeToTemporaryFile(...)
9. include/memory:4206: DB::Aggregator::writeToTemporaryFile(...)
10. DataStreams/ParallelInputsProcessor.h:223: DB::ParallelInputsProcessor<DB::ParallelAggregatingBlockInputStream::Handler>::thread(...)
Refs: https://github.com/ClickHouse/ClickHouse/issues/6833#issuecomment-579221732
(Reference to particular comment, since I'm not sure about the initial issue)
2020-02-17 20:15:29 +00:00
|
|
|
const bool shutdown_on_exception = true;
|
2019-01-14 10:59:58 +00:00
|
|
|
|
|
|
|
struct JobWithPriority
|
|
|
|
{
|
|
|
|
Job job;
|
|
|
|
int priority;
|
|
|
|
|
2019-08-03 11:02:40 +00:00
|
|
|
JobWithPriority(Job job_, int priority_)
|
|
|
|
: job(job_), priority(priority_) {}
|
2019-01-14 10:59:58 +00:00
|
|
|
|
|
|
|
bool operator< (const JobWithPriority & rhs) const
|
|
|
|
{
|
|
|
|
return priority < rhs.priority;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::priority_queue<JobWithPriority> jobs;
|
2019-01-14 19:22:09 +00:00
|
|
|
std::list<Thread> threads;
|
2019-01-14 10:59:58 +00:00
|
|
|
std::exception_ptr first_exception;
|
|
|
|
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
template <typename ReturnType>
|
|
|
|
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);
|
|
|
|
|
|
|
|
void worker(typename std::list<Thread>::iterator thread_it);
|
2019-01-14 10:59:58 +00:00
|
|
|
|
|
|
|
void finalize();
|
|
|
|
};
|
|
|
|
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
/// ThreadPool with std::thread for threads.
|
2019-01-14 10:59:58 +00:00
|
|
|
using FreeThreadPool = ThreadPoolImpl<std::thread>;
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
|
|
|
|
/** Global ThreadPool that can be used as a singleton.
|
|
|
|
* Why it is needed?
|
|
|
|
*
|
|
|
|
* Linux can create and destroy about 100 000 threads per second (quite good).
|
|
|
|
* With simple ThreadPool (based on mutex and condvar) you can assign about 200 000 tasks per second
|
|
|
|
* - not much difference comparing to not using a thread pool at all.
|
|
|
|
*
|
|
|
|
* But if you reuse OS threads instead of creating and destroying them, several benefits exist:
|
|
|
|
* - allocator performance will usually be better due to reuse of thread local caches, especially for jemalloc:
|
|
|
|
* https://github.com/jemalloc/jemalloc/issues/1347
|
|
|
|
* - address sanitizer and thread sanitizer will not fail due to global limit on number of created threads.
|
|
|
|
* - program will work faster in gdb;
|
|
|
|
*/
|
2019-08-22 03:24:05 +00:00
|
|
|
class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
|
2019-01-14 10:59:58 +00:00
|
|
|
{
|
2020-06-22 19:04:12 +00:00
|
|
|
static std::unique_ptr<GlobalThreadPool> the_instance;
|
|
|
|
|
|
|
|
GlobalThreadPool(size_t max_threads_, size_t max_free_threads_,
|
|
|
|
size_t queue_size_, const bool shutdown_on_exception_)
|
|
|
|
: FreeThreadPool(max_threads_, max_free_threads_, queue_size_,
|
|
|
|
shutdown_on_exception_)
|
|
|
|
{}
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
public:
|
2020-06-22 19:04:12 +00:00
|
|
|
static void initialize(size_t max_threads = 10000);
|
2019-08-22 03:24:05 +00:00
|
|
|
static GlobalThreadPool & instance();
|
2019-01-14 10:59:58 +00:00
|
|
|
};
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
|
|
|
|
/** Looks like std::thread but allocates threads in GlobalThreadPool.
|
|
|
|
* Also holds ThreadStatus for ClickHouse.
|
|
|
|
*/
|
2019-01-14 10:59:58 +00:00
|
|
|
class ThreadFromGlobalPool
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
ThreadFromGlobalPool() {}
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
template <typename Function, typename... Args>
|
|
|
|
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
|
2019-03-06 17:54:20 +00:00
|
|
|
: state(std::make_shared<Poco::Event>())
|
2019-01-14 10:59:58 +00:00
|
|
|
{
|
2019-10-17 14:41:27 +00:00
|
|
|
/// NOTE: If this will throw an exception, the destructor won't be called.
|
2019-01-14 19:22:09 +00:00
|
|
|
GlobalThreadPool::instance().scheduleOrThrow([
|
2019-03-06 16:46:05 +00:00
|
|
|
state = state,
|
2019-01-14 19:22:09 +00:00
|
|
|
func = std::forward<Function>(func),
|
|
|
|
args = std::make_tuple(std::forward<Args>(args)...)]
|
|
|
|
{
|
2020-01-28 13:47:47 +00:00
|
|
|
try
|
2019-03-06 19:57:54 +00:00
|
|
|
{
|
2020-01-28 13:47:47 +00:00
|
|
|
/// Thread status holds raw pointer on query context, thus it always must be destroyed
|
|
|
|
/// before sending signal that permits to join this thread.
|
2019-03-06 19:57:54 +00:00
|
|
|
DB::ThreadStatus thread_status;
|
|
|
|
std::apply(func, args);
|
|
|
|
}
|
2020-01-28 13:47:47 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
state->set();
|
|
|
|
throw;
|
|
|
|
}
|
2019-03-06 17:54:20 +00:00
|
|
|
state->set();
|
2019-01-14 19:22:09 +00:00
|
|
|
});
|
2019-01-14 10:59:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFromGlobalPool(ThreadFromGlobalPool && rhs)
|
|
|
|
{
|
|
|
|
*this = std::move(rhs);
|
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
|
|
|
|
{
|
2019-02-25 15:45:07 +00:00
|
|
|
if (joinable())
|
2019-01-14 10:59:58 +00:00
|
|
|
std::terminate();
|
2019-03-06 16:46:05 +00:00
|
|
|
state = std::move(rhs.state);
|
2019-01-14 10:59:58 +00:00
|
|
|
return *this;
|
|
|
|
}
|
|
|
|
|
|
|
|
~ThreadFromGlobalPool()
|
|
|
|
{
|
2019-02-25 15:45:07 +00:00
|
|
|
if (joinable())
|
2019-01-14 10:59:58 +00:00
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
|
|
|
|
void join()
|
|
|
|
{
|
2019-02-25 15:45:07 +00:00
|
|
|
if (!joinable())
|
|
|
|
std::terminate();
|
2019-03-06 16:46:05 +00:00
|
|
|
|
|
|
|
state->wait();
|
|
|
|
state.reset();
|
2019-01-14 10:59:58 +00:00
|
|
|
}
|
2019-01-14 19:22:09 +00:00
|
|
|
|
2019-02-25 15:45:07 +00:00
|
|
|
void detach()
|
|
|
|
{
|
|
|
|
if (!joinable())
|
|
|
|
std::terminate();
|
2019-03-06 16:46:05 +00:00
|
|
|
state.reset();
|
2019-02-25 15:45:07 +00:00
|
|
|
}
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
bool joinable() const
|
|
|
|
{
|
2019-03-06 16:46:05 +00:00
|
|
|
return state != nullptr;
|
2019-01-14 19:22:09 +00:00
|
|
|
}
|
|
|
|
|
2019-01-14 10:59:58 +00:00
|
|
|
private:
|
2019-03-06 16:46:05 +00:00
|
|
|
/// The state used in this object and inside the thread job.
|
2019-03-06 17:54:20 +00:00
|
|
|
std::shared_ptr<Poco::Event> state;
|
2019-01-14 10:59:58 +00:00
|
|
|
};
|
|
|
|
|
2019-01-14 19:22:09 +00:00
|
|
|
|
|
|
|
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
|
2019-01-14 10:59:58 +00:00
|
|
|
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
|