diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index c3b351af520..12c3ff3e418 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -9,14 +9,10 @@ namespace DB { IBackgroundJobExecutor::IBackgroundJobExecutor( - MergeTreeData & data_, Context & global_context_, - const String & task_name_, const TaskSleepSettings & sleep_settings_, const std::vector & pools_configs_) - : data(data_) - , global_context(global_context_) - , task_name(task_name_) + : global_context(global_context_) , sleep_settings(sleep_settings_) , rng(randomSeed()) { @@ -30,7 +26,7 @@ IBackgroundJobExecutor::IBackgroundJobExecutor( namespace { -bool incrementIfLess(std::atomic & atomic_value, long max_value) +bool incrementIfLess(std::atomic & atomic_value, Int64 max_value) { auto value = atomic_value.load(std::memory_order_relaxed); while (value < max_value) @@ -120,7 +116,7 @@ void IBackgroundJobExecutor::start() if (!scheduling_task) { scheduling_task = global_context.getSchedulePool().createTask( - data.getStorageID().getFullTableName() + task_name, [this]{ jobExecutingTask(); }); + getBackgroundJobName(), [this]{ jobExecutingTask(); }); } scheduling_task->activateAndSchedule(); @@ -151,14 +147,18 @@ BackgroundJobsExecutor::BackgroundJobsExecutor( MergeTreeData & data_, Context & global_context_) : IBackgroundJobExecutor( - data_, global_context_, - "(dataProcessingTask)", global_context_.getBackgroundProcessingTaskSleepSettings(), {PoolConfig{PoolType::MERGE_MUTATE, global_context_.getSettingsRef().background_pool_size, CurrentMetrics::BackgroundPoolTask}}) + , data(data_) { } +String BackgroundJobsExecutor::getBackgroundJobName() const +{ + return data.getStorageID().getFullTableName() + " (dataProcessingTask)"; +} + std::optional BackgroundJobsExecutor::getBackgroundJob() { auto job = data.getDataProcessingJob(); @@ -171,14 +171,18 @@ BackgroundMovesExecutor::BackgroundMovesExecutor( MergeTreeData & data_, Context & global_context_) : IBackgroundJobExecutor( - data_, global_context_, - "(dataMovingTask)", global_context_.getBackgroundMoveTaskSleepSettings(), - {PoolConfig{PoolType::MOVE, global_context_.getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}}) + {PoolConfig{PoolType::MOVE, global_context_.getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}}) + , data(data_) { } +String BackgroundMovesExecutor::getBackgroundJobName() const +{ + return data.getStorageID().getFullTableName() + " (dataMovingTask)"; +} + std::optional BackgroundMovesExecutor::getBackgroundJob() { auto job = data.getDataMovingJob(); diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h index 0abb5bce011..1710cccc54c 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -51,8 +51,6 @@ struct JobAndPool class IBackgroundJobExecutor { -protected: - MergeTreeData & data; Context & global_context; private: String task_name; @@ -67,13 +65,6 @@ private: BackgroundSchedulePool::TaskHolder scheduling_task; public: - IBackgroundJobExecutor( - MergeTreeData & data_, - Context & global_context_, - const String & task_name_, - const TaskSleepSettings & sleep_settings_, - const std::vector & pools_configs_); - void start(); void triggerTask(); void finish(); @@ -81,7 +72,14 @@ public: virtual ~IBackgroundJobExecutor(); protected: + IBackgroundJobExecutor( + Context & global_context_, + const TaskSleepSettings & sleep_settings_, + const std::vector & pools_configs_); + + virtual String getBackgroundJobName() const = 0; virtual std::optional getBackgroundJob() = 0; + private: void jobExecutingTask(); void scheduleTask(bool nothing_to_do); @@ -89,23 +87,29 @@ private: class BackgroundJobsExecutor final : public IBackgroundJobExecutor { +private: + MergeTreeData & data; public: BackgroundJobsExecutor( MergeTreeData & data_, Context & global_context_); protected: + String getBackgroundJobName() const override; std::optional getBackgroundJob() override; }; class BackgroundMovesExecutor final : public IBackgroundJobExecutor { +private: + MergeTreeData & data; public: BackgroundMovesExecutor( MergeTreeData & data_, Context & global_context_); protected: + String getBackgroundJobName() const override; std::optional getBackgroundJob() override; }; diff --git a/src/Storages/tests/gtest_background_executor.cpp b/src/Storages/tests/gtest_background_executor.cpp new file mode 100644 index 00000000000..1bc96ed0c42 --- /dev/null +++ b/src/Storages/tests/gtest_background_executor.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include +#include +using namespace std::chrono_literals; +namespace CurrentMetrics +{ + extern const Metric BackgroundPoolTask; +} + +using namespace DB; + +static std::atomic counter{0}; + +class TestJobExecutor : public IBackgroundJobExecutor +{ +public: + TestJobExecutor(Context & context) + :IBackgroundJobExecutor( + context, + TaskSleepSettings{}, + {PoolConfig{PoolType::MERGE_MUTATE, 4, CurrentMetrics::BackgroundPoolTask}}) + {} + +protected: + String getBackgroundJobName() const override + { + return "TestJob"; + } + + std::optional getBackgroundJob() override + { + return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; }, PoolType::MERGE_MUTATE}; + } +}; + +using TestExecutorPtr = std::unique_ptr; + +TEST(BackgroundExecutor, TestMetric) +{ + auto & context_holder = getContext(); + std::vector executors; + for (size_t i = 0; i < 100; ++i) + executors.emplace_back(std::make_unique(const_cast(context_holder.context))); + + for (size_t i = 0; i < 100; ++i) + executors[i]->start(); + + for (size_t i = 0; i < 100; ++i) + { + EXPECT_TRUE(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load() <= 4); + std::this_thread::sleep_for(200ms); + } + + for (size_t i = 0; i < 100; ++i) + executors[i]->finish(); + + /// Sanity check + EXPECT_TRUE(counter > 50); +}