This commit is contained in:
Mikhail Filimonov 2024-09-18 07:32:42 +02:00
parent 139fe0214a
commit 0880cdca2f
2 changed files with 32 additions and 33 deletions

View File

@ -142,14 +142,8 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
// can go to zero, meaning that max_threads threads were already started
remaining_pool_capacity.store(max_threads, std::memory_order_relaxed);
// if positive - it means that threre are more threads than jobs (and some are doing nothing)
// if zero - means that every thread have some job
// if negative - it means that we have more jobs than threads
available_threads.store(0, std::memory_order_relaxed);
}
template <typename Thread>
@ -233,6 +227,9 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
return false;
};
// Decrement available_threads, scoped to the job lifecycle.
// This ensures that available_threads decreases when a new job starts
// and automatically increments when the job completes or goes out of scope.
auto available_threads_decrement = std::make_unique<ScopedDecrement>(available_threads);
std::unique_ptr<ThreadFromThreadPool> new_thread;
@ -241,16 +238,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
size_t capacity = remaining_pool_capacity.load(std::memory_order_relaxed);
int currently_available_threads = available_threads.load(std::memory_order_relaxed);
// Try to create a new thread if all existing threads are busy and there is capacity.
while (currently_available_threads <= 0 && capacity > 0)
{
// Try to decrement remaining_pool_capacity atomically
if (remaining_pool_capacity.compare_exchange_weak(capacity, capacity - 1, std::memory_order_relaxed))
{
try
{
// Successfully decremented, attempt to create a new thread
new_thread = std::make_unique<ThreadFromThreadPool>(*this);
break; // Exit loop if thread creation succeeds
break; // Exit the loop once a thread is successfully created.
}
catch (...)
{

View File

@ -32,7 +32,7 @@ class JobWithPriority;
*
* This thread pool can be used as a task queue.
* For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks
* - in this case you will be blocked to keep 10 tasks in fly.
* - in this case you will be blocked to keep 10 tasks in flight.
*
* Thread: std::thread or something with identical interface.
*/
@ -51,10 +51,10 @@ public:
/// Constructor to initialize and start the thread (but not associate it with the pool)
explicit ThreadFromThreadPool(ThreadPoolImpl& parent_pool);
// shift the thread state from preparing to running to allow worker to start to work
// Shift the thread state from Preparing to Running to allow the worker to start.
void start(typename std::list<std::unique_ptr<ThreadFromThreadPool>>::iterator& it);
// Destructor to join the thread if needed (shift the state to failed if was not running)
// Destructor to join the thread if needed (shift the state to Destructing if it was not running)
~ThreadFromThreadPool();
private:
@ -71,15 +71,15 @@ public:
// Atomic state to track the thread's state
std::atomic<ThreadState> thread_state;
// stores the position of the thread in the parent thread pool list
// Stores the position of the thread in the parent thread pool list
typename std::list<std::unique_ptr<ThreadFromThreadPool>>::iterator thread_it;
// remove itself from the parent pool
// Remove itself from the parent pool
void removeSelfFromPoolNoPoolLock();
// worker does a busy loop (with yeld) while state is prapareing
// after that immediately return if the state changed to failed, or start the
// main working loop if the status is running
// Worker does a busy loop (with yield) while the state is Preparing.
// After that, immediately returns if the state changed to Destructing,
// or starts the main working loop if the state is Running.
void worker();
};
@ -103,14 +103,14 @@ public:
size_t queue_size_,
bool shutdown_on_exception_ = true);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Add new job. Locks until the number of scheduled jobs is less than the maximum or an exception in one of the threads was thrown.
/// If any thread has thrown an exception, the first exception will be rethrown from this method,
/// and the exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: lower is higher.
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
/// NOTE: Probably you should call wait() if an exception was thrown. If some previously scheduled jobs are using some objects,
/// located on the stack of the current thread, the stack must not be unwound until all jobs are finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in its own destructor.
void scheduleOrThrowOnError(Job job, Priority priority = {});
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
@ -121,12 +121,12 @@ public:
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// If any thread has thrown an exception, the first exception will be rethrown from this method,
/// and the exception will be cleared.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
/// You should not destroy the object while calling schedule or wait methods from other threads.
~ThreadPoolImpl();
/// Returns number of running and scheduled jobs.
@ -168,13 +168,17 @@ private:
size_t scheduled_jobs = 0;
// originally equeals to max_threads, but changes dynamically
// decrement with every new thread started, decrements when finishes
// if positive then more threads can be started
// Originally equals to max_threads, but changes dynamically.
// Decrements with every new thread started, increments when it finishes.
// If positive, then more threads can be started.
// When it comes to zero, it means that max_threads threads have already been started.
std::atomic<size_t> remaining_pool_capacity;
// increments every time new thread joins the thread pool, or job finishes
// decrements every time when task schedule starts
// Increments every time a new thread joins the thread pool or a job finishes.
// Decrements every time a task is scheduled.
// If positive, it means that there are more threads than jobs (and some are idle).
// If zero, it means that every thread has a job.
// If negative, it means that we have more jobs than threads.
std::atomic<int> available_threads;
bool shutdown = false;
@ -189,14 +193,13 @@ private:
template <typename ReturnType>
ReturnType scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context = true);
/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked.
/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with the mutex locked.
void startNewThreadsNoLock();
void finalize();
void onDestroy();
};
/// ThreadPool with std::thread for threads.
using FreeThreadPool = ThreadPoolImpl<std::thread>;