diff --git a/dbms/src/Common/ThreadPool.cpp b/dbms/src/Common/ThreadPool.cpp index cb08fa944a9..ce004ed7674 100644 --- a/dbms/src/Common/ThreadPool.cpp +++ b/dbms/src/Common/ThreadPool.cpp @@ -1,7 +1,6 @@ #include #include -#include #include @@ -34,6 +33,28 @@ ThreadPoolImpl::ThreadPoolImpl(size_t max_threads, size_t max_free_threa { } +template +void ThreadPoolImpl::setMaxThreads(size_t value) +{ + std::lock_guard lock(mutex); + max_threads = value; +} + +template +void ThreadPoolImpl::setMaxFreeThreads(size_t value) +{ + std::lock_guard lock(mutex); + max_free_threads = value; +} + +template +void ThreadPoolImpl::setQueueSize(size_t value) +{ + std::lock_guard lock(mutex); + queue_size = value; +} + + template template ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::optional wait_microseconds) @@ -59,7 +80,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; - if (wait_microseconds) + if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) return on_error(); @@ -83,6 +104,15 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti catch (...) { threads.pop_front(); + + /// Remove the job and return error to caller. + /// Note that if we have allocated at least one thread, we may continue + /// (one thread is enough to process all jobs). + /// But this condition indicate an error nevertheless and better to refuse. + + jobs.pop(); + --scheduled_jobs; + return on_error(); } } } diff --git a/dbms/src/Common/ThreadPool.h b/dbms/src/Common/ThreadPool.h index a0dae3f810c..23c0848e931 100644 --- a/dbms/src/Common/ThreadPool.h +++ b/dbms/src/Common/ThreadPool.h @@ -60,14 +60,18 @@ public: /// Returns number of running and scheduled jobs. size_t active() const; + void setMaxThreads(size_t value); + void setMaxFreeThreads(size_t value); + void setQueueSize(size_t value); + private: mutable std::mutex mutex; std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; - const size_t max_threads; - const size_t max_free_threads; - const size_t queue_size; + size_t max_threads; + size_t max_free_threads; + size_t queue_size; size_t scheduled_jobs = 0; bool shutdown = false; diff --git a/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp b/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp new file mode 100644 index 00000000000..48858fa40ef --- /dev/null +++ b/dbms/src/Common/tests/gtest_thread_pool_global_full.cpp @@ -0,0 +1,81 @@ +#include + +#include + +#include + + +/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool. +/// There was a bug: if local ThreadPool cannot allocate even a single thread, +/// the job will be scheduled but never get executed. + + +TEST(ThreadPool, GlobalFull1) +{ + GlobalThreadPool & global_pool = GlobalThreadPool::instance(); + + static constexpr size_t capacity = 5; + + global_pool.setMaxThreads(capacity); + global_pool.setMaxFreeThreads(1); + global_pool.setQueueSize(capacity); + global_pool.wait(); + + std::atomic counter = 0; + static constexpr size_t num_jobs = capacity + 1; + + auto func = [&] { ++counter; while (counter != num_jobs) {} }; + + ThreadPool pool(num_jobs); + + for (size_t i = 0; i < capacity; ++i) + pool.schedule(func); + + for (size_t i = capacity; i < num_jobs; ++i) + { + EXPECT_THROW(pool.schedule(func), DB::Exception); + ++counter; + } + + pool.wait(); + EXPECT_EQ(counter, num_jobs); +} + + +TEST(ThreadPool, GlobalFull2) +{ + GlobalThreadPool & global_pool = GlobalThreadPool::instance(); + + static constexpr size_t capacity = 5; + + global_pool.setMaxThreads(capacity); + global_pool.setMaxFreeThreads(1); + global_pool.setQueueSize(capacity); + + /// ThreadFromGlobalPool from local thread pools from previous test case have exited + /// but their threads from global_pool may not have finished (they still have to exit). + /// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test. + global_pool.wait(); + + std::atomic counter = 0; + auto func = [&] { ++counter; while (counter != capacity + 1) {} }; + + ThreadPool pool(capacity, 0, capacity); + for (size_t i = 0; i < capacity; ++i) + pool.schedule(func); + + ThreadPool another_pool(1); + EXPECT_THROW(another_pool.schedule(func), DB::Exception); + + ++counter; + + pool.wait(); + + global_pool.wait(); + + for (size_t i = 0; i < capacity; ++i) + another_pool.schedule([&] { ++counter; }); + + another_pool.wait(); + EXPECT_EQ(counter, capacity * 2 + 1); +}