Add test and slightly refactored

This commit is contained in:
alesapin 2020-10-15 13:22:02 +03:00
parent 583d7042e9
commit e84eda176a
3 changed files with 91 additions and 21 deletions

View File

@ -9,14 +9,10 @@ namespace DB
{
IBackgroundJobExecutor::IBackgroundJobExecutor(
MergeTreeData & data_,
Context & global_context_,
const String & task_name_,
const TaskSleepSettings & sleep_settings_,
const std::vector<PoolConfig> & pools_configs_)
: data(data_)
, global_context(global_context_)
, task_name(task_name_)
: global_context(global_context_)
, sleep_settings(sleep_settings_)
, rng(randomSeed())
{
@ -30,7 +26,7 @@ IBackgroundJobExecutor::IBackgroundJobExecutor(
namespace
{
bool incrementIfLess(std::atomic<long> & atomic_value, long max_value)
bool incrementIfLess(std::atomic<Int64> & atomic_value, Int64 max_value)
{
auto value = atomic_value.load(std::memory_order_relaxed);
while (value < max_value)
@ -120,7 +116,7 @@ void IBackgroundJobExecutor::start()
if (!scheduling_task)
{
scheduling_task = global_context.getSchedulePool().createTask(
data.getStorageID().getFullTableName() + task_name, [this]{ jobExecutingTask(); });
getBackgroundJobName(), [this]{ jobExecutingTask(); });
}
scheduling_task->activateAndSchedule();
@ -151,14 +147,18 @@ BackgroundJobsExecutor::BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context_)
: IBackgroundJobExecutor(
data_,
global_context_,
"(dataProcessingTask)",
global_context_.getBackgroundProcessingTaskSleepSettings(),
{PoolConfig{PoolType::MERGE_MUTATE, global_context_.getSettingsRef().background_pool_size, CurrentMetrics::BackgroundPoolTask}})
, data(data_)
{
}
String BackgroundJobsExecutor::getBackgroundJobName() const
{
return data.getStorageID().getFullTableName() + " (dataProcessingTask)";
}
std::optional<JobAndPool> BackgroundJobsExecutor::getBackgroundJob()
{
auto job = data.getDataProcessingJob();
@ -171,14 +171,18 @@ BackgroundMovesExecutor::BackgroundMovesExecutor(
MergeTreeData & data_,
Context & global_context_)
: IBackgroundJobExecutor(
data_,
global_context_,
"(dataMovingTask)",
global_context_.getBackgroundMoveTaskSleepSettings(),
{PoolConfig{PoolType::MOVE, global_context_.getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}})
{PoolConfig{PoolType::MOVE, global_context_.getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}})
, data(data_)
{
}
String BackgroundMovesExecutor::getBackgroundJobName() const
{
return data.getStorageID().getFullTableName() + " (dataMovingTask)";
}
std::optional<JobAndPool> BackgroundMovesExecutor::getBackgroundJob()
{
auto job = data.getDataMovingJob();

View File

@ -51,8 +51,6 @@ struct JobAndPool
class IBackgroundJobExecutor
{
protected:
MergeTreeData & data;
Context & global_context;
private:
String task_name;
@ -67,13 +65,6 @@ private:
BackgroundSchedulePool::TaskHolder scheduling_task;
public:
IBackgroundJobExecutor(
MergeTreeData & data_,
Context & global_context_,
const String & task_name_,
const TaskSleepSettings & sleep_settings_,
const std::vector<PoolConfig> & pools_configs_);
void start();
void triggerTask();
void finish();
@ -81,7 +72,14 @@ public:
virtual ~IBackgroundJobExecutor();
protected:
IBackgroundJobExecutor(
Context & global_context_,
const TaskSleepSettings & sleep_settings_,
const std::vector<PoolConfig> & pools_configs_);
virtual String getBackgroundJobName() const = 0;
virtual std::optional<JobAndPool> getBackgroundJob() = 0;
private:
void jobExecutingTask();
void scheduleTask(bool nothing_to_do);
@ -89,23 +87,29 @@ private:
class BackgroundJobsExecutor final : public IBackgroundJobExecutor
{
private:
MergeTreeData & data;
public:
BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context_);
protected:
String getBackgroundJobName() const override;
std::optional<JobAndPool> getBackgroundJob() override;
};
class BackgroundMovesExecutor final : public IBackgroundJobExecutor
{
private:
MergeTreeData & data;
public:
BackgroundMovesExecutor(
MergeTreeData & data_,
Context & global_context_);
protected:
String getBackgroundJobName() const override;
std::optional<JobAndPool> getBackgroundJob() override;
};

View File

@ -0,0 +1,62 @@
#include <gtest/gtest.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Common/CurrentMetrics.h>
#include <Common/tests/gtest_global_context.h>
#include <memory>
#include <chrono>
using namespace std::chrono_literals;
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
using namespace DB;
static std::atomic<Int64> counter{0};
class TestJobExecutor : public IBackgroundJobExecutor
{
public:
TestJobExecutor(Context & context)
:IBackgroundJobExecutor(
context,
TaskSleepSettings{},
{PoolConfig{PoolType::MERGE_MUTATE, 4, CurrentMetrics::BackgroundPoolTask}})
{}
protected:
String getBackgroundJobName() const override
{
return "TestJob";
}
std::optional<JobAndPool> getBackgroundJob() override
{
return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; }, PoolType::MERGE_MUTATE};
}
};
using TestExecutorPtr = std::unique_ptr<TestJobExecutor>;
TEST(BackgroundExecutor, TestMetric)
{
auto & context_holder = getContext();
std::vector<TestExecutorPtr> executors;
for (size_t i = 0; i < 100; ++i)
executors.emplace_back(std::make_unique<TestJobExecutor>(const_cast<Context &>(context_holder.context)));
for (size_t i = 0; i < 100; ++i)
executors[i]->start();
for (size_t i = 0; i < 100; ++i)
{
EXPECT_TRUE(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load() <= 4);
std::this_thread::sleep_for(200ms);
}
for (size_t i = 0; i < 100; ++i)
executors[i]->finish();
/// Sanity check
EXPECT_TRUE(counter > 50);
}