Fix BackgroundSchedulePool

This commit is contained in:
Silviu Caragea 2018-01-01 22:35:29 +02:00
parent b24a4b2111
commit f2479673d6
3 changed files with 45 additions and 39 deletions

View File

@ -42,7 +42,7 @@ BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const
bool BackgroundSchedulePool::TaskInfo::schedule()
{
std::lock_guard lock(mutex);
std::lock_guard lock(schedule_mutex);
if (deactivated || scheduled)
return false;
@ -59,40 +59,52 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
{
pool.scheduleDelayedTask(shared_from_this(), ms);
std::lock_guard lock(schedule_mutex);
if (deactivated || scheduled)
return false;
pool.scheduleDelayedTask(shared_from_this(), ms, lock);
return true;
}
void BackgroundSchedulePool::TaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
std::lock_guard lock_schedule(schedule_mutex);
if (deactivated)
return;
std::lock_guard lock(mutex);
deactivated = true;
scheduled = false;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}
void BackgroundSchedulePool::TaskInfo::activate()
{
std::lock_guard lock(mutex);
std::lock_guard lock(schedule_mutex);
deactivated = false;
}
void BackgroundSchedulePool::TaskInfo::execute()
{
std::lock_guard lock(mutex);
std::lock_guard lock_exec(exec_mutex);
if (deactivated)
return;
{
std::lock_guard lock_schedule(schedule_mutex);
if (deactivated)
return;
scheduled = false;
}
scheduled = false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
Stopwatch watch;
@ -155,7 +167,7 @@ void BackgroundSchedulePool::removeTask(const TaskHandle & task)
}
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms)
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex>&)
{
Poco::Timestamp current_time;
@ -173,7 +185,7 @@ void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t
}
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::recursive_mutex> &)
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> &)
{
{
std::lock_guard lock(delayed_tasks_lock);
@ -212,11 +224,11 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
while (!shutdown)
{
Poco::Timestamp min_time;
TaskHandle task;
{
std::lock_guard lock(delayed_tasks_lock);
std::unique_lock lock(delayed_tasks_lock);
Poco::Timestamp min_time;
if (!delayed_tasks.empty())
{
@ -224,28 +236,23 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
min_time = t->first;
task = t->second;
}
if (!task)
{
wakeup_event.wait(lock);
continue;
}
Poco::Timestamp current_time;
if (min_time > current_time)
{
wakeup_event.wait_for(lock, std::chrono::microseconds(min_time - current_time));
continue;
}
}
if (shutdown)
break;
if (!task)
{
std::unique_lock lock(delayed_tasks_lock);
wakeup_event.wait(lock);
continue;
}
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock lock(delayed_tasks_lock);
wakeup_event.wait_for(lock, std::chrono::microseconds(min_time - current_time));
}
else
{
task->schedule();
}
task->schedule();
}
}

View File

@ -61,9 +61,8 @@ public:
void execute();
/// This mutex is recursive, because it's locked during 'execute' method,
/// and the task can schedule itself again during execution.
std::recursive_mutex mutex;
std::mutex schedule_mutex;
std::mutex exec_mutex;
std::string name;
bool deactivated = false;
@ -90,10 +89,10 @@ private:
void delayExecutionThreadFunction();
/// Schedule task for execution after specified delay from now.
void scheduleDelayedTask(const TaskHandle & task, size_t ms);
void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> &);
/// Remove task, that was scheduled with delay, from schedule.
void cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::recursive_mutex> &);
void cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> &);
/// Number for worker threads.
const size_t size;

View File

@ -213,7 +213,7 @@ WatchCallback ZooKeeper::callbackForTaskHandle(const TaskHandlePtr & task)
{
if (t)
{
t->scheduleAfter(0);
t->schedule();
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
}
};