diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index f4ef89e20f9..16bb5b3c4f0 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -5,48 +5,31 @@ #include #include -namespace CurrentMetrics -{ - extern const Metric BackgroundPoolTask; -} - namespace DB { -BackgroundJobsExecutor::BackgroundJobsExecutor( - MergeTreeData & data_, - Context & global_context_) +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +IBackgroundJobExecutor::IBackgroundJobExecutor( + MergeTreeData & data_, + Context & global_context_, + const String & task_name_, + const TaskSleepSettings & sleep_settings_, + const std::vector & pools_configs_) : data(data_) , global_context(global_context_) - , max_pool_size(global_context.getSettingsRef().background_pool_size) - , data_processing_pool(max_pool_size, 0, max_pool_size, false) - , move_pool(global_context.getSettingsRef().background_move_pool_size, 0, max_pool_size, false) + , task_name(task_name_) + , sleep_settings(sleep_settings_) , rng(randomSeed()) { - data_processing_task = global_context.getSchedulePool().createTask( - data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); }); - const auto & config = global_context.getConfigRef(); - settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10); - settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0); - settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); - settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10); - settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600); - settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); - settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); -} - -void BackgroundJobsExecutor::dataMovingTask() -try -{ - auto job = data.getDataMovingJob(); - if (job) - move_pool.scheduleOrThrowOnError(job); - - data_moving_task->schedule(); -} -catch(...) -{ - tryLogCurrentException(__PRETTY_FUNCTION__); + for (const auto & pool_config : pools_configs_) + { + pools.try_emplace(pool_config.pool_type, pool_config.max_pool_size, 0, pool_config.max_pool_size, false); + pools_configs.emplace(pool_config.pool_type, pool_config); + } } namespace @@ -63,95 +46,270 @@ bool incrementIfLess(std::atomic & atomic_value, long max_value) } -void BackgroundJobsExecutor::dataProcessingTask() + +void IBackgroundJobExecutor::scheduleTask(bool nothing_to_do) { - if (incrementIfLess(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], max_pool_size)) + auto errors = errors_count.load(std::memory_order_relaxed); + size_t next_time_to_execute = 0; + if (errors != 0) + next_time_to_execute += 1000 * (std::min( + sleep_settings.task_sleep_seconds_when_no_work_max, + sleep_settings.task_sleep_seconds_when_no_work_min * std::pow(sleep_settings.task_sleep_seconds_when_no_work_multiplier, errors)) + + std::uniform_real_distribution(0, sleep_settings.task_sleep_seconds_when_no_work_random_part)(rng)); + else if (nothing_to_do) + next_time_to_execute += 1000 * (sleep_settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, sleep_settings.task_sleep_seconds_when_no_work_random_part)(rng)); + else + next_time_to_execute = 1000 * std::uniform_real_distribution(0, sleep_settings.thread_sleep_seconds_random_part)(rng); + + scheduling_task->scheduleAfter(next_time_to_execute); +} + +void IBackgroundJobExecutor::jobExecutingTask() +try +{ + auto job_and_pool = getBackgroundJob(); + if (job_and_pool) { - try + auto & pool_config = pools_configs[job_and_pool->pool_type]; + /// If corresponding pool is not full, otherwise try next time + if (incrementIfLess(CurrentMetrics::values[pool_config.tasks_metric], pool_config.max_pool_size)) { - auto job = data.getDataProcessingJob(); - if (job) + try /// this try required because we have to manually decrement metric { - data_processing_pool.scheduleOrThrowOnError([this, job{std::move(job)}] () + pools[job_and_pool->pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool->job)}] () { - try + try /// We don't want exceptions in background pool { job(); - CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; + CurrentMetrics::values[pool_config.tasks_metric]--; errors_count = 0; } catch (...) { errors_count++; tryLogCurrentException(__PRETTY_FUNCTION__); - CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; + CurrentMetrics::values[pool_config.tasks_metric]--; } }); - auto errors = errors_count.load(std::memory_order_relaxed); - if (errors != 0) - { - auto next_time_to_execute = 1000 * (std::min( - settings.task_sleep_seconds_when_no_work_max, - settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, errors)) - + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)); - data_processing_task->scheduleAfter(next_time_to_execute); - } - else - data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.thread_sleep_seconds_random_part)(rng))); } - else + catch (...) { - data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); - CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; + tryLogCurrentException(__PRETTY_FUNCTION__); + CurrentMetrics::values[pool_config.tasks_metric]--; } } - catch(...) - { - CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; - data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); - tryLogCurrentException(__PRETTY_FUNCTION__); - } + scheduleTask(false); } - else + else /// Nothing to do, no jobs { - /// Pool overloaded - data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); + scheduleTask(true); } } - - -void BackgroundJobsExecutor::startMovingTaskIfNeeded() +catch (...) /// Exception while we looking for task { - if (data.areBackgroundMovesNeeded() && !data_moving_task) + tryLogCurrentException(__PRETTY_FUNCTION__); + scheduleTask(true); +} + +void IBackgroundJobExecutor::start() +{ + if (!scheduling_task) { - data_moving_task = global_context.getSchedulePool().createTask( - data.getStorageID().getFullTableName() + " (dataMovingTask)", [this]{ dataMovingTask(); }); - data_moving_task->activateAndSchedule(); + scheduling_task = global_context.getSchedulePool().createTask( + data.getStorageID().getFullTableName() + task_name, [this]{ jobExecutingTask(); }); } + + scheduling_task->activateAndSchedule(); } -void BackgroundJobsExecutor::start() +void IBackgroundJobExecutor::finish() { - if (data_processing_task) - data_processing_task->activateAndSchedule(); - startMovingTaskIfNeeded(); -} - -void BackgroundJobsExecutor::triggerDataProcessing() -{ - if (data_processing_task) - data_processing_task->schedule(); -} - -void BackgroundJobsExecutor::finish() -{ - data_processing_task->deactivate(); - data_processing_pool.wait(); - if (data_moving_task) + if (scheduling_task) { - data_moving_task->deactivate(); - move_pool.wait(); + scheduling_task->deactivate(); + for (auto & [pool_type, pool] : pools) + pool.wait(); } } +void IBackgroundJobExecutor::triggerDataProcessing() +{ + if (scheduling_task) + scheduling_task->schedule(); +} + + IBackgroundJobExecutor::~IBackgroundJobExecutor() +{ + finish(); +} + +BackgroundJobsExecutor::BackgroundJobsExecutor( + MergeTreeData & data_, + Context & global_context_) + : IBackgroundJobExecutor( + data_, + global_context_, + "(dataProcessingTask)", + TaskSleepSettings{}, + {PoolConfig{PoolType::MERGE_MUTATE, global_context_.getSettingsRef().background_pool_size, CurrentMetrics::BackgroundPoolTask}}) +{ +} + +std::optional BackgroundJobsExecutor::getBackgroundJob() +{ + auto job = data.getDataProcessingJob(); + if (job) + return JobAndPool{job, PoolType::MERGE_MUTATE}; + return {}; +} + +BackgroundMovesExecutor::BackgroundMovesExecutor( + MergeTreeData & data_, + Context & global_context_) + : IBackgroundJobExecutor( + data_, + global_context_, + "(dataMovingTask)", + TaskSleepSettings{}, + {PoolConfig{PoolType::MOVE, global_context_.getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}}) +{ +} + +std::optional BackgroundMovesExecutor::getBackgroundJob() +{ + auto job = data.getDataMovingJob(); + if (job) + return JobAndPool{job, PoolType::MOVE}; + return {}; +} + +//BackgroundJobsExecutor::BackgroundJobsExecutor( +// MergeTreeData & data_, +// Context & global_context_) +// : data(data_) +// , global_context(global_context_) +// , max_pool_size(global_context.getSettingsRef().background_pool_size) +// , data_processing_pool(max_pool_size, 0, max_pool_size, false) +// , move_pool(global_context.getSettingsRef().background_move_pool_size, 0, max_pool_size, false) +// , rng(randomSeed()) +//{ +// data_processing_task = global_context.getSchedulePool().createTask( +// data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); }); +// const auto & config = global_context.getConfigRef(); +// settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10); +// settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0); +// settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); +// settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10); +// settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600); +// settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); +// settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); +//} +// +//void BackgroundJobsExecutor::dataMovingTask() +//try +//{ +// auto job = data.getDataMovingJob(); +// if (job) +// move_pool.scheduleOrThrowOnError(job); +// +// data_moving_task->schedule(); +//} +//catch(...) +//{ +// tryLogCurrentException(__PRETTY_FUNCTION__); +//} +// +// +// +//void BackgroundJobsExecutor::dataProcessingTask() +//{ +// if (incrementIfLess(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], max_pool_size)) +// { +// try +// { +// auto job = data.getDataProcessingJob(); +// if (job) +// { +// data_processing_pool.scheduleOrThrowOnError([this, job{std::move(job)}] () +// { +// try +// { +// job(); +// CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; +// errors_count = 0; +// } +// catch (...) +// { +// errors_count++; +// tryLogCurrentException(__PRETTY_FUNCTION__); +// CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; +// } +// }); +// auto errors = errors_count.load(std::memory_order_relaxed); +// if (errors != 0) +// { +// auto next_time_to_execute = 1000 * (std::min( +// settings.task_sleep_seconds_when_no_work_max, +// settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, errors)) +// + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)); +// data_processing_task->scheduleAfter(next_time_to_execute); +// } +// else +// data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.thread_sleep_seconds_random_part)(rng))); +// } +// else +// { +// data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); +// CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; +// } +// } +// catch(...) +// { +// CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--; +// data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); +// tryLogCurrentException(__PRETTY_FUNCTION__); +// } +// } +// else +// { +// /// Pool overloaded +// data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng))); +// } +//} +// +// +//void BackgroundJobsExecutor::startMovingTaskIfNeeded() +//{ +// if (data.areBackgroundMovesNeeded() && !data_moving_task) +// { +// data_moving_task = global_context.getSchedulePool().createTask( +// data.getStorageID().getFullTableName() + " (dataMovingTask)", [this]{ dataMovingTask(); }); +// data_moving_task->activateAndSchedule(); +// } +//} +// +//void BackgroundJobsExecutor::start() +//{ +// if (data_processing_task) +// data_processing_task->activateAndSchedule(); +// startMovingTaskIfNeeded(); +//} +// +//void BackgroundJobsExecutor::triggerDataProcessing() +//{ +// if (data_processing_task) +// data_processing_task->schedule(); +//} +// +//void BackgroundJobsExecutor::finish() +//{ +// data_processing_task->deactivate(); +// data_processing_pool.wait(); +// if (data_moving_task) +// { +// data_moving_task->deactivate(); +// move_pool.wait(); +// } +//} + } diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h index b7bd63f7169..4d01198f681 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -8,18 +8,14 @@ namespace CurrentMetrics { extern const Metric BackgroundPoolTask; + extern const Metric BackgroundMovePoolTask; } namespace DB { -enum PoolType -{ - MERGE_MUTATING, - MOVING, -} -struct PoolSettings +struct TaskSleepSettings { double thread_sleep_seconds = 10; double thread_sleep_seconds_random_part = 1.0; @@ -29,41 +25,90 @@ struct PoolSettings double task_sleep_seconds_when_no_work_min = 10; double task_sleep_seconds_when_no_work_max = 600; double task_sleep_seconds_when_no_work_multiplier = 1.1; + double task_sleep_seconds_when_no_work_random_part = 1.0; - - CurrentMetrics::Metric tasks_metric = CurrentMetrics::BackgroundPoolTask; - - PoolSettings() noexcept {} }; -class BackgroundJobsExecutor +enum PoolType { -private: + MERGE_MUTATE, + FETCH, + MOVE, + LOW_PRIORITY, +}; + +struct PoolConfig +{ + PoolType pool_type; + size_t max_pool_size; + CurrentMetrics::Metric tasks_metric; +}; + +struct JobAndPool +{ + ThreadPool::Job job; + PoolType pool_type; +}; + +class IBackgroundJobExecutor +{ +protected: MergeTreeData & data; Context & global_context; - size_t max_pool_size; - ThreadPool data_processing_pool; - ThreadPool move_pool; - std::atomic errors_count{0}; +private: + String task_name; + TaskSleepSettings sleep_settings; pcg64 rng; - PoolSettings settings; - BackgroundSchedulePool::TaskHolder data_processing_task; - BackgroundSchedulePool::TaskHolder data_moving_task; + std::atomic errors_count{0}; - void dataProcessingTask(); - void dataMovingTask(); + std::unordered_map pools; + std::unordered_map pools_configs; + BackgroundSchedulePool::TaskHolder scheduling_task; + +public: + IBackgroundJobExecutor( + MergeTreeData & data_, + Context & global_context_, + const String & task_name_, + const TaskSleepSettings & sleep_settings_, + const std::vector & pools_configs_); + + void start(); + void triggerDataProcessing(); + void finish(); + + virtual ~IBackgroundJobExecutor(); + +protected: + virtual std::optional getBackgroundJob() = 0; +private: + void jobExecutingTask(); + void scheduleTask(bool nothing_to_do); +}; + + +class BackgroundJobsExecutor final : public IBackgroundJobExecutor +{ public: BackgroundJobsExecutor( MergeTreeData & data_, Context & global_context_); - void startMovingTaskIfNeeded(); - void triggerDataProcessing(); - void triggerMovesProcessing(); - void start(); - void finish(); +protected: + std::optional getBackgroundJob() override; }; - + +class BackgroundMovesExecutor final : public IBackgroundJobExecutor +{ +public: + BackgroundMovesExecutor( + MergeTreeData & data_, + Context & global_context_); + +protected: + std::optional getBackgroundJob() override; +}; + } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 87dfbd4d879..c4f8436678d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -79,6 +79,8 @@ StorageMergeTree::StorageMergeTree( , writer(*this) , merger_mutator(*this, global_context.getSettingsRef().background_pool_size) , background_executor(*this, global_context) + , background_moves_executor(*this, global_context) + { loadDataParts(has_force_restore_data_flag); @@ -106,6 +108,8 @@ void StorageMergeTree::startup() try { background_executor.start(); + if (areBackgroundMovesNeeded()) + background_moves_executor.start(); } catch (...) { @@ -143,6 +147,7 @@ void StorageMergeTree::shutdown() parts_mover.moves_blocker.cancelForever(); background_executor.finish(); + background_moves_executor.finish(); try { @@ -1449,7 +1454,7 @@ MutationCommands StorageMergeTree::getFirtsAlterMutationCommandsForPart(const Da void StorageMergeTree::startBackgroundMovesIfNeeded() { - background_executor.startMovingTaskIfNeeded(); + background_executor.start(); } } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index b1946e48d4f..f7473872189 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -98,6 +98,7 @@ private: MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; BackgroundJobsExecutor background_executor; + BackgroundMovesExecutor background_moves_executor; /// For block numbers. SimpleIncrement increment{0}; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 453956ffa8b..bc55b87fee7 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -198,6 +198,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , queue(*this) , fetcher(*this) , background_executor(*this, global_context) + , background_moves_executor(*this, global_context) , cleanup_thread(*this) , part_check_thread(*this) , restarting_thread(*this) @@ -3485,6 +3486,8 @@ void StorageReplicatedMergeTree::startup() auto lock = queue.lockQueue(); background_executor.start(); } + if (areBackgroundMovesNeeded()) + background_moves_executor.start(); } catch (...) @@ -3527,6 +3530,7 @@ void StorageReplicatedMergeTree::shutdown() /// MUTATE, etc. query. queue.pull_log_blocker.cancelForever(); } + background_moves_executor.finish(); if (data_parts_exchange_endpoint) { @@ -5974,7 +5978,7 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() { - background_executor.startMovingTaskIfNeeded(); + background_moves_executor.start(); } } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 4ce3c0ad3c1..ba01ca9d3af 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -276,6 +276,7 @@ private: /// Threads. BackgroundJobsExecutor background_executor; + BackgroundMovesExecutor background_moves_executor; /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. bool queue_update_in_progress = false;