ClickHouse/src/Storages/MergeTree/BackgroundJobsAssignee.cpp

156 lines
4.0 KiB
C++
Raw Normal View History

2021-09-06 12:01:16 +00:00
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
2020-10-14 07:22:48 +00:00
#include <Storages/MergeTree/MergeTreeData.h>
2020-10-14 12:32:35 +00:00
#include <Common/CurrentMetrics.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <random>
2020-10-14 14:56:42 +00:00
namespace DB
2020-10-14 12:32:35 +00:00
{
2020-10-13 14:25:42 +00:00
2021-09-06 12:01:16 +00:00
BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_)
: WithContext(global_context_)
2021-08-30 19:37:03 +00:00
, data(data_)
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
2020-10-14 12:32:35 +00:00
, rng(randomSeed())
2021-08-30 19:37:03 +00:00
, type(type_)
2020-10-13 14:25:42 +00:00
{
2020-10-14 07:22:48 +00:00
}
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::trigger()
2020-10-16 13:48:12 +00:00
{
2021-08-30 19:37:03 +00:00
std::lock_guard lock(holder_mutex);
if (!holder)
return;
2020-10-14 12:32:35 +00:00
2021-11-09 12:26:51 +00:00
/// Do not reset backoff factor if some task has appeared,
/// but decrease it exponentially on every new task.
no_work_done_count /= 2;
2020-11-26 07:25:57 +00:00
/// We have background jobs, schedule task as soon as possible
2021-08-30 19:37:03 +00:00
holder->schedule();
2020-11-26 07:25:57 +00:00
}
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::postpone()
2020-11-26 07:25:57 +00:00
{
2021-08-30 19:37:03 +00:00
std::lock_guard lock(holder_mutex);
2021-08-30 19:37:03 +00:00
if (!holder)
return;
2021-11-09 12:26:51 +00:00
no_work_done_count += 1;
2021-08-30 19:37:03 +00:00
double random_addition = std::uniform_real_distribution<double>(0, sleep_settings.task_sleep_seconds_when_no_work_random_part)(rng);
size_t next_time_to_execute = 1000 * (std::min(
sleep_settings.task_sleep_seconds_when_no_work_max,
2021-11-09 12:26:51 +00:00
sleep_settings.thread_sleep_seconds_if_nothing_to_do * std::pow(sleep_settings.task_sleep_seconds_when_no_work_multiplier, no_work_done_count))
2021-08-30 19:37:03 +00:00
+ random_addition);
2020-11-26 07:25:57 +00:00
2021-08-30 19:37:03 +00:00
holder->scheduleAfter(next_time_to_execute, false);
2020-10-14 14:56:42 +00:00
}
2020-10-16 10:12:31 +00:00
2022-06-01 19:09:53 +00:00
bool BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task)
2020-10-16 10:12:31 +00:00
{
2021-08-30 19:37:03 +00:00
bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task);
2021-09-03 22:15:20 +00:00
res ? trigger() : postpone();
2022-06-01 19:09:53 +00:00
return res;
2020-10-16 10:12:31 +00:00
}
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
2020-10-14 14:56:42 +00:00
{
2021-08-30 19:37:03 +00:00
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
2021-09-03 22:15:20 +00:00
res ? trigger() : postpone();
2021-08-30 19:37:03 +00:00
}
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
2021-08-30 19:37:03 +00:00
{
bool res = getContext()->getMovesExecutor()->trySchedule(move_task);
2021-09-03 22:15:20 +00:00
res ? trigger() : postpone();
2021-08-30 19:37:03 +00:00
}
2021-06-21 13:36:21 +00:00
2021-11-09 12:26:51 +00:00
void BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger)
{
2021-11-09 12:26:51 +00:00
bool res = getContext()->getCommonExecutor()->trySchedule(common_task) && need_trigger;
res ? trigger() : postpone();
}
2021-09-06 12:01:16 +00:00
String BackgroundJobsAssignee::toString(Type type)
2021-08-30 19:37:03 +00:00
{
switch (type)
2020-10-14 12:32:35 +00:00
{
2021-08-30 19:37:03 +00:00
case Type::DataProcessing:
return "DataProcessing";
case Type::Moving:
return "Moving";
2020-10-14 12:32:35 +00:00
}
2021-09-06 22:07:41 +00:00
__builtin_unreachable();
2020-10-13 14:25:42 +00:00
}
2021-08-30 19:37:03 +00:00
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::start()
2020-10-14 14:56:42 +00:00
{
2021-08-30 19:37:03 +00:00
std::lock_guard lock(holder_mutex);
if (!holder)
2021-09-08 00:21:21 +00:00
holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ threadFunc(); });
2021-08-11 03:52:28 +00:00
2021-08-30 19:37:03 +00:00
holder->activateAndSchedule();
2020-10-14 14:56:42 +00:00
}
2020-10-13 14:25:42 +00:00
2021-09-06 12:01:16 +00:00
void BackgroundJobsAssignee::finish()
2020-10-14 14:56:42 +00:00
{
2021-08-30 19:37:03 +00:00
/// No lock here, because scheduled tasks could call trigger method
if (holder)
2020-10-14 14:56:42 +00:00
{
2021-08-30 19:37:03 +00:00
holder->deactivate();
2020-10-14 14:56:42 +00:00
2021-08-31 11:02:39 +00:00
auto storage_id = data.getStorageID();
2020-10-14 12:32:35 +00:00
2021-09-02 10:39:27 +00:00
getContext()->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getMergeMutateExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getCommonExecutor()->removeTasksCorrespondingToStorage(storage_id);
2020-10-14 07:22:48 +00:00
}
}
2020-10-13 14:25:42 +00:00
2021-09-08 00:21:21 +00:00
void BackgroundJobsAssignee::threadFunc()
try
2021-06-21 13:36:21 +00:00
{
2021-08-30 19:37:03 +00:00
bool succeed = false;
switch (type)
{
case Type::DataProcessing:
succeed = data.scheduleDataProcessingJob(*this);
break;
case Type::Moving:
succeed = data.scheduleDataMovingJob(*this);
break;
}
if (!succeed)
postpone();
2021-06-21 13:36:21 +00:00
}
catch (...) /// Catch any exception to avoid thread termination.
{
tryLogCurrentException(__PRETTY_FUNCTION__);
2021-08-30 19:37:03 +00:00
postpone();
}
2021-06-21 13:36:21 +00:00
2021-09-06 12:01:16 +00:00
BackgroundJobsAssignee::~BackgroundJobsAssignee()
2020-10-13 14:25:42 +00:00
{
2021-09-03 14:20:34 +00:00
try
{
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2020-10-13 14:25:42 +00:00
}
}