Merge pull request #25548 from nikitamikhaylov/background-processing

A little improvement in BackgroundJobsExecutor
This commit is contained in:
alesapin 2021-06-28 13:24:21 +03:00 committed by GitHub
commit 4c213b639f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 140 additions and 165 deletions

View File

@ -25,7 +25,8 @@ IBackgroundJobExecutor::IBackgroundJobExecutor(
{
for (const auto & pool_config : pools_configs_)
{
pools.try_emplace(pool_config.pool_type, pool_config.max_pool_size, 0, pool_config.max_pool_size, false);
const auto max_pool_size = pool_config.get_max_pool_size();
pools.try_emplace(pool_config.pool_type, max_pool_size, 0, max_pool_size, false);
pools_configs.emplace(pool_config.pool_type, pool_config);
}
}
@ -82,19 +83,22 @@ bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_v
}
void IBackgroundJobExecutor::jobExecutingTask()
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
try
{
auto job_and_pool = getBackgroundJob();
if (job_and_pool) /// If we have job, then try to assign into background pool
{
auto & pool_config = pools_configs[job_and_pool->pool_type];
auto & pool_config = pools_configs[job_and_pool.pool_type];
const auto max_pool_size = pool_config.get_max_pool_size();
/// If corresponding pool is not full increment metric and assign new job
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], pool_config.max_pool_size))
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
{
try /// this try required because we have to manually decrement metric
{
pools[job_and_pool->pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool->job)}] ()
/// Synchronize pool size, because config could be reloaded
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] ()
{
try /// We don't want exceptions in background pool
{
@ -138,12 +142,6 @@ try
scheduleTask(/* with_backoff = */ false);
}
}
else /// Nothing to do, no jobs
{
scheduleTask(/* with_backoff = */ true);
}
}
catch (...) /// Exception while we looking for a task, reschedule
{
tryLogCurrentException(__PRETTY_FUNCTION__);
@ -156,7 +154,7 @@ void IBackgroundJobExecutor::start()
if (!scheduling_task)
{
scheduling_task = getContext()->getSchedulePool().createTask(
getBackgroundTaskName(), [this]{ jobExecutingTask(); });
getBackgroundTaskName(), [this]{ backgroundTaskFunction(); });
}
scheduling_task->activateAndSchedule();
@ -180,6 +178,12 @@ void IBackgroundJobExecutor::triggerTask()
scheduling_task->schedule();
}
void IBackgroundJobExecutor::backgroundTaskFunction()
{
if (!scheduleJob())
scheduleTask(/* with_backoff = */ true);
}
IBackgroundJobExecutor::~IBackgroundJobExecutor()
{
finish();
@ -191,8 +195,19 @@ BackgroundJobsExecutor::BackgroundJobsExecutor(
: IBackgroundJobExecutor(
global_context_,
global_context_->getBackgroundProcessingTaskSchedulingSettings(),
{PoolConfig{PoolType::MERGE_MUTATE, global_context_->getSettingsRef().background_pool_size, CurrentMetrics::BackgroundPoolTask},
PoolConfig{PoolType::FETCH, global_context_->getSettingsRef().background_fetches_pool_size, CurrentMetrics::BackgroundFetchesPoolTask}})
{PoolConfig
{
.pool_type = PoolType::MERGE_MUTATE,
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_pool_size; },
.tasks_metric = CurrentMetrics::BackgroundPoolTask
},
PoolConfig
{
.pool_type = PoolType::FETCH,
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_fetches_pool_size; },
.tasks_metric = CurrentMetrics::BackgroundFetchesPoolTask
}
})
, data(data_)
{
}
@ -202,9 +217,9 @@ String BackgroundJobsExecutor::getBackgroundTaskName() const
return data.getStorageID().getFullTableName() + " (dataProcessingTask)";
}
std::optional<JobAndPool> BackgroundJobsExecutor::getBackgroundJob()
bool BackgroundJobsExecutor::scheduleJob()
{
return data.getDataProcessingJob();
return data.scheduleDataProcessingJob(*this);
}
BackgroundMovesExecutor::BackgroundMovesExecutor(
@ -213,7 +228,13 @@ BackgroundMovesExecutor::BackgroundMovesExecutor(
: IBackgroundJobExecutor(
global_context_,
global_context_->getBackgroundMoveTaskSchedulingSettings(),
{PoolConfig{PoolType::MOVE, global_context_->getSettingsRef().background_move_pool_size, CurrentMetrics::BackgroundMovePoolTask}})
{PoolConfig
{
.pool_type = PoolType::MOVE,
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_move_pool_size; },
.tasks_metric = CurrentMetrics::BackgroundMovePoolTask
}
})
, data(data_)
{
}
@ -223,9 +244,9 @@ String BackgroundMovesExecutor::getBackgroundTaskName() const
return data.getStorageID().getFullTableName() + " (dataMovingTask)";
}
std::optional<JobAndPool> BackgroundMovesExecutor::getBackgroundJob()
bool BackgroundMovesExecutor::scheduleJob()
{
return data.getDataMovingJob();
return data.scheduleDataMovingJob(*this);
}
}

View File

@ -59,7 +59,7 @@ protected:
/// This pool type
PoolType pool_type;
/// Max pool size in threads
size_t max_pool_size;
const std::function<size_t()> get_max_pool_size;
/// Metric that we have to increment when we execute task in this pool
CurrentMetrics::Metric tasks_metric;
};
@ -99,6 +99,9 @@ public:
/// Finish execution: deactivate background task and wait already scheduled jobs
void finish();
/// Executes job in a nested pool
void execute(JobAndPool job_and_pool);
/// Just call finish
virtual ~IBackgroundJobExecutor();
@ -110,12 +113,13 @@ protected:
/// Name for task in background schedule pool
virtual String getBackgroundTaskName() const = 0;
/// Get job for background execution
virtual std::optional<JobAndPool> getBackgroundJob() = 0;
/// Schedules a job in a nested pool in this class.
virtual bool scheduleJob() = 0;
private:
/// Function that executes in background scheduling pool
void jobExecutingTask();
void backgroundTaskFunction();
/// Recalculate timeouts when we have to check for a new job
void scheduleTask(bool with_backoff);
/// Run background task as fast as possible and reset errors counter
@ -136,7 +140,7 @@ public:
protected:
String getBackgroundTaskName() const override;
std::optional<JobAndPool> getBackgroundJob() override;
bool scheduleJob() override;
};
/// Move jobs executor, move parts between disks in the background
@ -152,7 +156,7 @@ public:
protected:
String getBackgroundTaskName() const override;
std::optional<JobAndPool> getBackgroundJob() override;
bool scheduleJob() override;
};
}

View File

@ -4624,19 +4624,20 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
}
}
std::optional<JobAndPool> MergeTreeData::getDataMovingJob()
bool MergeTreeData::scheduleDataMovingJob(IBackgroundJobExecutor & executor)
{
if (parts_mover.moves_blocker.isCancelled())
return {};
return false;
auto moving_tagger = selectPartsForMove();
if (moving_tagger->parts_to_move.empty())
return {};
return false;
return JobAndPool{[this, moving_tagger] () mutable
executor.execute({[this, moving_tagger] () mutable
{
return moveParts(moving_tagger);
}, PoolType::MOVE};
}, PoolType::MOVE});
return true;
}
bool MergeTreeData::areBackgroundMovesNeeded() const

View File

@ -57,6 +57,7 @@ class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;
class IBackgroundJobExecutor;
namespace ErrorCodes
{
@ -807,10 +808,10 @@ public:
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
/// Return main processing background job, like merge/mutate/fetch and so on
virtual std::optional<JobAndPool> getDataProcessingJob() = 0;
/// Return job to move parts between disks/volumes and so on.
std::optional<JobAndPool> getDataMovingJob();
/// Schedules background job to like merge/mutate/fetch an executor
virtual bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) = 0;
/// Schedules job to move parts between disks/volumes and so on.
bool scheduleDataMovingJob(IBackgroundJobExecutor & executor);
bool areBackgroundMovesNeeded() const;
/// Lock part in zookeeper for use common S3 data in several nodes

View File

@ -1001,13 +1001,13 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
return true;
}
std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor) //-V657
{
if (shutdown_called)
return {};
return false;
if (merger_mutator.merges_blocker.isCancelled())
return {};
return false;
auto metadata_snapshot = getInMemoryMetadataPtr();
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
@ -1017,21 +1017,25 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
if (merge_entry || mutate_entry)
{
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
if (merge_entry)
{
executor.execute({[this, metadata_snapshot, merge_entry, share_lock] () mutable
{
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
else if (mutate_entry)
}, PoolType::MERGE_MUTATE});
return true;
}
if (mutate_entry)
{
executor.execute({[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
__builtin_unreachable();
}, PoolType::MERGE_MUTATE};
}, PoolType::MERGE_MUTATE});
return true;
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
return JobAndPool{[this, share_lock] ()
executor.execute({[this, share_lock] ()
{
/// All use relative_data_path which changes during rename
/// so execute under share lock.
@ -1041,9 +1045,10 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob() //-V657
clearOldMutations();
clearEmptyParts();
return true;
}, PoolType::MERGE_MUTATE};
}, PoolType::MERGE_MUTATE});
return true;
}
return {};
return false;
}
Int64 StorageMergeTree::getCurrentMutationVersion(

View File

@ -94,7 +94,7 @@ public:
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
std::optional<JobAndPool> getDataProcessingJob() override;
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
private:

View File

@ -3157,30 +3157,35 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
});
}
std::optional<JobAndPool> StorageReplicatedMergeTree::getDataProcessingJob()
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor)
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
return {};
return false;
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();
if (!selected_entry)
return {};
PoolType pool_type;
return false;
/// Depending on entry type execute in fetches (small) pool or big merge_mutate pool
if (selected_entry->log_entry->type == LogEntry::GET_PART)
pool_type = PoolType::FETCH;
else
pool_type = PoolType::MERGE_MUTATE;
return JobAndPool{[this, selected_entry] () mutable
{
executor.execute({[this, selected_entry] () mutable
{
return processQueueEntry(selected_entry);
}, pool_type};
}, PoolType::FETCH});
return true;
}
else
{
executor.execute({[this, selected_entry] () mutable
{
return processQueueEntry(selected_entry);
}, PoolType::MERGE_MUTATE});
return true;
}
}

View File

@ -215,8 +215,8 @@ public:
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
/// Get job to execute in background pool (merge, mutate, drop range and so on)
std::optional<JobAndPool> getDataProcessingJob() override;
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
/// Checks that fetches are not disabled with action blocker and pool for fetches
/// is not overloaded

View File

@ -1,62 +0,0 @@
#include <gtest/gtest.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Common/CurrentMetrics.h>
#include <Common/tests/gtest_global_context.h>
#include <memory>
#include <chrono>
using namespace std::chrono_literals;
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
using namespace DB;
static std::atomic<Int64> counter{0};
class TestJobExecutor : public IBackgroundJobExecutor
{
public:
explicit TestJobExecutor(ContextPtr local_context)
:IBackgroundJobExecutor(
local_context,
BackgroundTaskSchedulingSettings{},
{PoolConfig{PoolType::MERGE_MUTATE, 4, CurrentMetrics::BackgroundPoolTask}})
{}
protected:
String getBackgroundTaskName() const override
{
return "TestTask";
}
std::optional<JobAndPool> getBackgroundJob() override
{
return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; return true; }, PoolType::MERGE_MUTATE};
}
};
using TestExecutorPtr = std::unique_ptr<TestJobExecutor>;
TEST(BackgroundExecutor, TestMetric)
{
const auto & context_holder = getContext();
std::vector<TestExecutorPtr> executors;
for (size_t i = 0; i < 100; ++i)
executors.emplace_back(std::make_unique<TestJobExecutor>(context_holder.context));
for (size_t i = 0; i < 100; ++i)
executors[i]->start();
for (size_t i = 0; i < 100; ++i)
{
EXPECT_TRUE(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load() <= 4);
std::this_thread::sleep_for(200ms);
}
for (size_t i = 0; i < 100; ++i)
executors[i]->finish();
/// Sanity check
EXPECT_TRUE(counter > 50);
}