Added exponential backoff for BackgroundProcessingPool #2281

This commit is contained in:
Alexey Milovidov 2018-07-08 05:57:41 +03:00
parent 7707b5a209
commit 3445c78190
2 changed files with 29 additions and 12 deletions

View File

@ -21,9 +21,14 @@ namespace CurrentMetrics
namespace DB
{
static constexpr double thread_sleep_seconds = 10;
static constexpr double thread_sleep_seconds_random_part = 1.0;
constexpr double BackgroundProcessingPool::sleep_seconds;
constexpr double BackgroundProcessingPool::sleep_seconds_random_part;
/// For exponential backoff.
static constexpr double task_sleep_seconds_when_no_work_min = 10;
static constexpr double task_sleep_seconds_when_no_work_max = 600;
static constexpr double task_sleep_seconds_when_no_work_multiplier = 1.1;
static constexpr double task_sleep_seconds_when_no_work_random_part = 1.0;
void BackgroundProcessingPoolTaskInfo::wake()
@ -119,7 +124,7 @@ void BackgroundProcessingPool::threadFunction()
current_memory_tracker = &memory_tracker;
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
while (!shutdown)
{
@ -154,8 +159,8 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock<std::mutex> lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(sleep_seconds
+ std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
std::chrono::duration<double>(thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
continue;
}
@ -165,7 +170,7 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock<std::mutex> lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, thread_sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock<std::shared_mutex> rlock(task->rwlock);
@ -187,16 +192,27 @@ void BackgroundProcessingPool::threadFunction()
if (shutdown)
break;
/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
{
std::unique_lock<std::mutex> lock(tasks_mutex);
if (task->removed)
continue;
if (done_work)
task->count_no_work_done = 0;
else
++task->count_no_work_done;
/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute; /// current time
if (!done_work)
next_time_to_execute += 1000000 * (std::min(
task_sleep_seconds_when_no_work_max,
task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, task_sleep_seconds_when_no_work_random_part)(rng));
tasks.erase(task->iterator);
task->iterator = tasks.emplace(next_time_to_execute, task);
}

View File

@ -53,8 +53,6 @@ protected:
using Threads = std::vector<std::thread>;
const size_t size;
static constexpr double sleep_seconds = 10;
static constexpr double sleep_seconds_random_part = 1.0;
Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex;
@ -91,6 +89,9 @@ protected:
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator;
/// For exponential backoff.
size_t count_no_work_done = 0;
};
}