mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Better [#METR-22608].
This commit is contained in:
parent
5dc3b28774
commit
0db84044e5
@ -37,7 +37,7 @@ public:
|
||||
public:
|
||||
void incrementCounter(const String & name, int value = 1)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(pool.mutex);
|
||||
std::unique_lock<std::mutex> lock(pool.counters_mutex);
|
||||
local_counters[name] += value;
|
||||
pool.counters[name] += value;
|
||||
}
|
||||
@ -103,8 +103,10 @@ private:
|
||||
static constexpr double sleep_seconds_random_part = 1.0;
|
||||
|
||||
Tasks tasks; /// Задачи в порядке, в котором мы планируем их выполнять.
|
||||
std::mutex tasks_mutex;
|
||||
|
||||
Counters counters;
|
||||
std::mutex mutex; /// Для работы со списком tasks, а также с counters (когда threads не пустой).
|
||||
std::mutex counters_mutex;
|
||||
|
||||
Threads threads;
|
||||
|
||||
|
@ -20,13 +20,16 @@ void BackgroundProcessingPool::TaskInfo::wake()
|
||||
if (removed)
|
||||
return;
|
||||
|
||||
std::unique_lock<std::mutex> lock(pool.mutex);
|
||||
pool.tasks.splice(pool.tasks.begin(), pool.tasks, iterator);
|
||||
|
||||
/// Если эта задача в прошлый раз ничего не сделала, и ей было назначено спать, то отменим время сна.
|
||||
time_t current_time = time(0);
|
||||
if (next_time_to_execute > current_time)
|
||||
next_time_to_execute = current_time;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
|
||||
pool.tasks.splice(pool.tasks.begin(), pool.tasks, iterator);
|
||||
|
||||
/// Если эта задача в прошлый раз ничего не сделала, и ей было назначено спать, то отменим время сна.
|
||||
if (next_time_to_execute > current_time)
|
||||
next_time_to_execute = current_time;
|
||||
}
|
||||
|
||||
/// Если все потоки сейчас выполняют работу, этот вызов никого не разбудит.
|
||||
pool.wake_event.notify_one();
|
||||
@ -39,13 +42,13 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
|
||||
|
||||
threads.resize(size);
|
||||
for (auto & thread : threads)
|
||||
thread = std::thread([this] { threadFunction(); });
|
||||
thread = std::thread([this] { threadFunction(); });
|
||||
}
|
||||
|
||||
|
||||
int BackgroundProcessingPool::getCounter(const String & name)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(counters_mutex);
|
||||
return counters[name];
|
||||
}
|
||||
|
||||
@ -54,7 +57,7 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
|
||||
TaskHandle res(new TaskInfo(*this, task));
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
res->iterator = tasks.insert(tasks.begin(), res);
|
||||
}
|
||||
|
||||
@ -65,7 +68,8 @@ BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Tas
|
||||
|
||||
void BackgroundProcessingPool::removeTask(const TaskHandle & task)
|
||||
{
|
||||
task->removed = true;
|
||||
if (task->removed.exchange(true))
|
||||
return;
|
||||
|
||||
/// Дождёмся завершения всех выполнений этой задачи.
|
||||
{
|
||||
@ -73,7 +77,7 @@ void BackgroundProcessingPool::removeTask(const TaskHandle & task)
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
tasks.erase(task->iterator);
|
||||
}
|
||||
}
|
||||
@ -112,7 +116,7 @@ void BackgroundProcessingPool::threadFunction()
|
||||
time_t min_time = std::numeric_limits<time_t>::max();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
|
||||
if (!tasks.empty())
|
||||
{
|
||||
@ -141,7 +145,7 @@ void BackgroundProcessingPool::threadFunction()
|
||||
|
||||
if (!task)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
wake_event.wait_for(lock,
|
||||
std::chrono::duration<double>(sleep_seconds
|
||||
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
|
||||
@ -152,7 +156,7 @@ void BackgroundProcessingPool::threadFunction()
|
||||
time_t current_time = time(0);
|
||||
if (min_time > current_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
wake_event.wait_for(lock, std::chrono::duration<double>(
|
||||
min_time - current_time + std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
|
||||
}
|
||||
@ -182,7 +186,7 @@ void BackgroundProcessingPool::threadFunction()
|
||||
/// Вычтем все счётчики обратно.
|
||||
if (!counters_diff.empty())
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(counters_mutex);
|
||||
for (const auto & it : counters_diff)
|
||||
counters[it.first] -= it.second;
|
||||
}
|
||||
@ -192,7 +196,7 @@ void BackgroundProcessingPool::threadFunction()
|
||||
|
||||
if (has_exception)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
std::unique_lock<std::mutex> lock(tasks_mutex);
|
||||
wake_event.wait_for(lock, std::chrono::duration<double>(sleep_seconds));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user