mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Fixed the possibility of hanging queries when server is overloaded
This commit is contained in:
parent
e3f7dbd7d4
commit
67d91c4b88
@ -1,7 +1,6 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
@ -34,6 +33,28 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threa
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
max_threads = value;
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
max_free_threads = value;
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
queue_size = value;
|
||||
}
|
||||
|
||||
|
||||
template <typename Thread>
|
||||
template <typename ReturnType>
|
||||
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
|
||||
@ -59,7 +80,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
||||
|
||||
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
|
||||
|
||||
if (wait_microseconds)
|
||||
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
|
||||
{
|
||||
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
|
||||
return on_error();
|
||||
@ -83,6 +104,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
||||
catch (...)
|
||||
{
|
||||
threads.pop_front();
|
||||
|
||||
/// Remove the job and return error to caller.
|
||||
/// Note that if we have allocated at least one thread, we may continue
|
||||
/// (one thread is enough to process all jobs).
|
||||
/// But this condition indicate an error nevertheless and better to refuse.
|
||||
|
||||
jobs.pop();
|
||||
--scheduled_jobs;
|
||||
return on_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,14 +60,18 @@ public:
|
||||
/// Returns number of running and scheduled jobs.
|
||||
size_t active() const;
|
||||
|
||||
void setMaxThreads(size_t value);
|
||||
void setMaxFreeThreads(size_t value);
|
||||
void setQueueSize(size_t value);
|
||||
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable job_finished;
|
||||
std::condition_variable new_job_or_shutdown;
|
||||
|
||||
const size_t max_threads;
|
||||
const size_t max_free_threads;
|
||||
const size_t queue_size;
|
||||
size_t max_threads;
|
||||
size_t max_free_threads;
|
||||
size_t queue_size;
|
||||
|
||||
size_t scheduled_jobs = 0;
|
||||
bool shutdown = false;
|
||||
|
81
dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
Normal file
81
dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
Normal file
@ -0,0 +1,81 @@
|
||||
#include <atomic>
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
|
||||
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
|
||||
/// the job will be scheduled but never get executed.
|
||||
|
||||
|
||||
TEST(ThreadPool, GlobalFull1)
|
||||
{
|
||||
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
|
||||
|
||||
static constexpr size_t capacity = 5;
|
||||
|
||||
global_pool.setMaxThreads(capacity);
|
||||
global_pool.setMaxFreeThreads(1);
|
||||
global_pool.setQueueSize(capacity);
|
||||
global_pool.wait();
|
||||
|
||||
std::atomic<size_t> counter = 0;
|
||||
static constexpr size_t num_jobs = capacity + 1;
|
||||
|
||||
auto func = [&] { ++counter; while (counter != num_jobs) {} };
|
||||
|
||||
ThreadPool pool(num_jobs);
|
||||
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
pool.schedule(func);
|
||||
|
||||
for (size_t i = capacity; i < num_jobs; ++i)
|
||||
{
|
||||
EXPECT_THROW(pool.schedule(func), DB::Exception);
|
||||
++counter;
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
EXPECT_EQ(counter, num_jobs);
|
||||
}
|
||||
|
||||
|
||||
TEST(ThreadPool, GlobalFull2)
|
||||
{
|
||||
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
|
||||
|
||||
static constexpr size_t capacity = 5;
|
||||
|
||||
global_pool.setMaxThreads(capacity);
|
||||
global_pool.setMaxFreeThreads(1);
|
||||
global_pool.setQueueSize(capacity);
|
||||
|
||||
/// ThreadFromGlobalPool from local thread pools from previous test case have exited
|
||||
/// but their threads from global_pool may not have finished (they still have to exit).
|
||||
/// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
|
||||
global_pool.wait();
|
||||
|
||||
std::atomic<size_t> counter = 0;
|
||||
auto func = [&] { ++counter; while (counter != capacity + 1) {} };
|
||||
|
||||
ThreadPool pool(capacity, 0, capacity);
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
pool.schedule(func);
|
||||
|
||||
ThreadPool another_pool(1);
|
||||
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
|
||||
|
||||
++counter;
|
||||
|
||||
pool.wait();
|
||||
|
||||
global_pool.wait();
|
||||
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
another_pool.schedule([&] { ++counter; });
|
||||
|
||||
another_pool.wait();
|
||||
EXPECT_EQ(counter, capacity * 2 + 1);
|
||||
}
|
Loading…
Reference in New Issue
Block a user