Merge pull request #38028 from kitaisreal/background-schedule-pool-refactoring

BackgroundSchedulePool remove Poco::NotificationQueue
This commit is contained in:
Maksim Kita 2022-06-15 23:10:49 +02:00 committed by GitHub
commit 3eea38f078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 43 deletions

View File

@ -10,18 +10,6 @@
namespace DB
{
class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePoolTaskInfoPtr & task_) : task(task_) {}
void execute() { task->execute(); }
private:
BackgroundSchedulePoolTaskInfoPtr task;
};
BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
: pool(pool_), log_name(log_name_), function(function_)
@ -39,7 +27,7 @@ bool BackgroundSchedulePoolTaskInfo::schedule()
return true;
}
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms, bool overwrite)
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t milliseconds, bool overwrite)
{
std::lock_guard lock(schedule_mutex);
@ -48,7 +36,7 @@ bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms, bool overwrite)
if (delayed && !overwrite)
return false;
pool.scheduleDelayedTask(shared_from_this(), ms, lock);
pool.scheduleDelayedTask(shared_from_this(), milliseconds, lock);
return true;
}
@ -106,7 +94,7 @@ void BackgroundSchedulePoolTaskInfo::execute()
UInt64 milliseconds = watch.elapsedMilliseconds();
/// If the task is executed longer than specified time, it will be logged.
static const int32_t slow_execution_threshold_ms = 200;
static constexpr UInt64 slow_execution_threshold_ms = 200;
if (milliseconds >= slow_execution_threshold_ms)
LOG_TRACE(&Poco::Logger::get(log_name), "Execution took {} ms.", milliseconds);
@ -121,7 +109,7 @@ void BackgroundSchedulePoolTaskInfo::execute()
/// will have their chance to execute
if (scheduled)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
pool.scheduleTask(shared_from_this());
}
}
@ -136,14 +124,14 @@ void BackgroundSchedulePoolTaskInfo::scheduleImpl(std::lock_guard<std::mutex> &
/// But if it is currently executing, do nothing because it will be enqueued
/// at the end of the execute() method.
if (!executing)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
pool.scheduleTask(shared_from_this());
}
Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const Coordination::WatchResponse &)
return [task = shared_from_this()](const Coordination::WatchResponse &)
{
t->schedule();
task->schedule();
};
}
@ -184,15 +172,18 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
try
{
{
std::unique_lock lock(delayed_tasks_mutex);
std::lock_guard lock_tasks(tasks_mutex);
std::lock_guard lock_delayed_tasks(delayed_tasks_mutex);
shutdown = true;
wakeup_cond.notify_all();
}
queue.wakeUpAll();
delayed_thread.join();
tasks_cond_var.notify_all();
delayed_tasks_cond_var.notify_all();
LOG_TRACE(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Waiting for threads to finish.");
delayed_thread.join();
for (auto & thread : threads)
thread.join();
}
@ -208,6 +199,15 @@ BackgroundSchedulePool::TaskHolder BackgroundSchedulePool::createTask(const std:
return TaskHolder(std::make_shared<TaskInfo>(*this, name, function));
}
void BackgroundSchedulePool::scheduleTask(TaskInfoPtr task_info)
{
{
std::lock_guard tasks_lock(tasks_mutex);
tasks.push_back(std::move(task_info));
}
tasks_cond_var.notify_one();
}
void BackgroundSchedulePool::scheduleDelayedTask(const TaskInfoPtr & task, size_t ms, std::lock_guard<std::mutex> & /* task_schedule_mutex_lock */)
{
@ -223,7 +223,7 @@ void BackgroundSchedulePool::scheduleDelayedTask(const TaskInfoPtr & task, size_
task->delayed = true;
}
wakeup_cond.notify_all();
delayed_tasks_cond_var.notify_all();
}
@ -235,7 +235,7 @@ void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lo
task->delayed = false;
}
wakeup_cond.notify_all();
delayed_tasks_cond_var.notify_all();
}
@ -264,20 +264,25 @@ void BackgroundSchedulePool::threadFunction()
while (!shutdown)
{
/// We have to wait with timeout to prevent very rare deadlock, caused by the following race condition:
/// 1. Background thread N: threadFunction(): checks for shutdown (it's false)
/// 2. Main thread: ~BackgroundSchedulePool(): sets shutdown to true, calls queue.wakeUpAll(), it triggers
/// all existing Poco::Events inside Poco::NotificationQueue which background threads are waiting on.
/// 3. Background thread N: threadFunction(): calls queue.waitDequeueNotification(), it creates
/// new Poco::Event inside Poco::NotificationQueue and starts to wait on it
/// Background thread N will never be woken up.
/// TODO Do we really need Poco::NotificationQueue? Why not to use std::queue + mutex + condvar or maybe even DB::ThreadPool?
constexpr size_t wait_timeout_ms = 500;
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification(wait_timeout_ms))
TaskInfoPtr task;
{
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
task_notification.execute();
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
tasks_cond_var.wait(tasks_lock, [&]()
{
return shutdown || !tasks.empty();
});
if (!tasks.empty())
{
task = tasks.front();
tasks.pop_front();
}
}
if (task)
task->execute();
}
}
@ -309,7 +314,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
if (!task)
{
wakeup_cond.wait(lock);
delayed_tasks_cond_var.wait(lock);
continue;
}
@ -317,7 +322,7 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
if (min_time > current_time)
{
wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
delayed_tasks_cond_var.wait_for(lock, std::chrono::microseconds(min_time - current_time));
continue;
}
else

View File

@ -62,6 +62,8 @@ private:
void threadFunction();
void delayExecutionThreadFunction();
void scheduleTask(TaskInfoPtr task_info);
/// Schedule task for execution after specified delay from now.
void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
@ -69,12 +71,16 @@ private:
void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
std::atomic<bool> shutdown {false};
/// Tasks.
std::condition_variable tasks_cond_var;
std::mutex tasks_mutex;
std::deque<TaskInfoPtr> tasks;
Threads threads;
Poco::NotificationQueue queue;
/// Delayed notifications.
/// Delayed tasks.
std::condition_variable wakeup_cond;
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPool delayed_thread;
@ -102,7 +108,7 @@ public:
/// Schedule for execution after specified delay.
/// If overwrite is set then the task will be re-scheduled (if it was already scheduled, i.e. delayed == true).
bool scheduleAfter(size_t ms, bool overwrite = true);
bool scheduleAfter(size_t milliseconds, bool overwrite = true);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();