Something very similar to working code

This commit is contained in:
alesapin 2020-10-14 15:32:35 +03:00
parent efd3126b5d
commit 4014e0f08d
10 changed files with 112 additions and 137 deletions

View File

@ -1,5 +1,14 @@
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/CurrentMetrics.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <random>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{
@ -9,11 +18,21 @@ BackgroundJobsExecutor::BackgroundJobsExecutor(
Context & global_context_)
: data(data_)
, global_context(global_context_)
, data_processing_pool(global_context.getSettingsRef().background_pool_size, 0, 10000, false)
, move_pool(global_context.getSettingsRef().background_move_pool_size, 0, 10000, false)
, max_pool_size(global_context.getSettingsRef().background_pool_size)
, data_processing_pool(max_pool_size, 0, max_pool_size, false)
, move_pool(global_context.getSettingsRef().background_move_pool_size, 0, max_pool_size, false)
, rng(randomSeed())
{
data_processing_task = global_context.getSchedulePool().createTask(
data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); });
const auto & config = global_context.getConfigRef();
settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10);
settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0);
settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10);
settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600);
settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
}
void BackgroundJobsExecutor::dataMovingTask()
@ -21,7 +40,7 @@ try
{
auto job = data.getDataMovingJob();
if (job)
move_pool.scheduleOrThrowOnError(*job);
move_pool.scheduleOrThrowOnError(job);
data_moving_task->schedule();
}
@ -30,20 +49,77 @@ catch(...)
tryLogCurrentException(__PRETTY_FUNCTION__);
}
void BackgroundJobsExecutor::dataProcessingTask()
try
namespace
{
auto job = data.getDataProcessingJob();
if (job)
data_processing_pool.scheduleOrThrowOnError(*job);
data_processing_task->schedule();
}
catch (...)
bool incrementIfLess(std::atomic<long> & atomic_value, long max_value)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
auto value = atomic_value.load(std::memory_order_relaxed);
while (value < max_value)
if(atomic_value.compare_exchange_weak(value, value + 1, std::memory_order_release, std::memory_order_relaxed))
return true;
return false;
}
}
void BackgroundJobsExecutor::dataProcessingTask()
{
if (incrementIfLess(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], max_pool_size))
{
try
{
auto job = data.getDataProcessingJob();
if (job)
{
data_processing_pool.scheduleOrThrowOnError([this, job{std::move(job)}] ()
{
try
{
job();
CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--;
errors_count = 0;
}
catch (...)
{
errors_count++;
tryLogCurrentException(__PRETTY_FUNCTION__);
CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--;
}
});
auto errors = errors_count.load(std::memory_order_relaxed);
if (errors != 0)
{
auto next_time_to_execute = 1000 * (std::min(
settings.task_sleep_seconds_when_no_work_max,
settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, errors))
+ std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng));
data_processing_task->scheduleAfter(next_time_to_execute);
}
else
data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
}
else
{
data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)));
CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--;
}
}
catch(...)
{
CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask]--;
data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)));
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
else
{
/// Pool overloaded
data_processing_task->scheduleAfter(1000 * (settings.thread_sleep_seconds_if_nothing_to_do + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)));
}
}
void BackgroundJobsExecutor::startMovingTaskIfNeeded()
{
if (data.areBackgroundMovesNeeded() && !data_moving_task)

View File

@ -2,8 +2,9 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/MergeTreeBackgroundJob.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <pcg_random.hpp>
namespace DB
{
@ -13,8 +14,12 @@ class BackgroundJobsExecutor
private:
MergeTreeData & data;
Context & global_context;
size_t max_pool_size;
ThreadPool data_processing_pool;
ThreadPool move_pool;
std::atomic<size_t> errors_count{0};
pcg64 rng;
BackgroundProcessingPool::PoolSettings settings;
BackgroundSchedulePool::TaskHolder data_processing_task;
BackgroundSchedulePool::TaskHolder data_moving_task;

View File

@ -17,25 +17,16 @@ enum PoolType
struct MergeTreeBackgroundJob
{
ThreadPool::Job job;
CurrentMetrics::Metric metric;
PoolType execute_in_pool;
MergeTreeBackgroundJob(ThreadPool::Job && job_, CurrentMetrics::Metric metric_, PoolType execute_in_pool_)
: job(std::move(job_)), metric(metric_), execute_in_pool(execute_in_pool_)
MergeTreeBackgroundJob(ThreadPool::Job && job_, PoolType execute_in_pool_)
: job(std::move(job_)), execute_in_pool(execute_in_pool_)
{}
void operator()()
try
{
if (metric != 0)
{
CurrentMetrics::Increment metric_increment{metric};
job();
}
else
{
job();
}
job();
}
catch (...)
{

View File

@ -3621,7 +3621,7 @@ bool MergeTreeData::selectPartsAndMove()
return moveParts(std::move(moving_tagger));
}
std::optional<MergeTreeBackgroundJob> MergeTreeData::getDataMovingJob()
ThreadPool::Job MergeTreeData::getDataMovingJob()
{
if (parts_mover.moves_blocker.isCancelled())
return {};
@ -3630,14 +3630,10 @@ std::optional<MergeTreeBackgroundJob> MergeTreeData::getDataMovingJob()
if (moving_tagger->parts_to_move.empty())
return {};
auto job = [this, moving_tagger{std::move(moving_tagger)}] () mutable
return [this, moving_tagger{std::move(moving_tagger)}] () mutable
{
moveParts(moving_tagger);
};
MergeTreeBackgroundJob result_job(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE);
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE);
}
bool MergeTreeData::areBackgroundMovesNeeded() const

View File

@ -25,7 +25,6 @@
#include <Interpreters/Aggregator.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBackgroundJob.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -711,8 +710,8 @@ public:
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
virtual std::optional<MergeTreeBackgroundJob> getDataProcessingJob() = 0;
std::optional<MergeTreeBackgroundJob> getDataMovingJob();
virtual ThreadPool::Job getDataProcessingJob() = 0;
ThreadPool::Job getDataMovingJob();
bool areBackgroundMovesNeeded() const;
protected:

View File

@ -159,15 +159,15 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const
{
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool);
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) const
{
//LOG_DEBUG(&Poco::Logger::get("DEBUG"), "POOL SIZE {}, POOL USED {}", pool_size, pool_used);
if (pool_used > pool_size)
{
std::cerr << "POOLSIZE:" << pool_size << " POOL USED:" << pool_used << std::endl;
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -210,8 +210,9 @@ void StorageMergeTree::drop()
dropAllData();
}
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder & lock_holder)
{
lock_holder.release();
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
@ -902,7 +903,7 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
return true;
}
std::optional<MergeTreeBackgroundJob> StorageMergeTree::getDataProcessingJob()
ThreadPool::Job StorageMergeTree::getDataProcessingJob()
{
if (shutdown_called)
return {};
@ -919,18 +920,17 @@ std::optional<MergeTreeBackgroundJob> StorageMergeTree::getDataProcessingJob()
if (merge_entry || mutate_entry)
{
auto job = [this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}] () mutable
return [this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, *merge_entry);
else if (mutate_entry)
mutateSelectedPart(metadata_snapshot, *mutate_entry);
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE);
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
auto job = [this] ()
return [this] ()
{
{
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -942,60 +942,10 @@ std::optional<MergeTreeBackgroundJob> StorageMergeTree::getDataProcessingJob()
}
clearOldMutations();
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), 0, PoolType::MERGE_MUTATE);
}
return {};
}
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
if (shutdown_called)
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.merges_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
try
{
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
{
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// All use relative_data_path which changes during rename
/// so execute under share lock.
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
clearOldWriteAheadLogs();
}
clearOldMutations();
}
auto metadata_snapshot = getInMemoryMetadataPtr();
auto merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr);
///TODO: read deduplicate option from table config
if (merge_entry && mergeSelectedParts(metadata_snapshot, false, *merge_entry))
return BackgroundProcessingPoolTaskResult::SUCCESS;
auto mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr);
if (mutate_entry && mutateSelectedPart(metadata_snapshot, *mutate_entry))
return BackgroundProcessingPoolTaskResult::SUCCESS;
return BackgroundProcessingPoolTaskResult::ERROR;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return BackgroundProcessingPoolTaskResult::ERROR;
}
throw;
}
}
Int64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const

View File

@ -88,7 +88,7 @@ public:
CheckResults checkData(const ASTPtr & query, const Context & context) override;
std::optional<MergeTreeBackgroundJob> getDataProcessingJob() override;
ThreadPool::Job getDataProcessingJob() override;
private:
/// Mutex and condvar for synchronous mutations wait
@ -156,8 +156,6 @@ private:
std::optional<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason);
bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry);
BackgroundProcessingPoolTaskResult mergeMutateTask();
Int64 getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;

View File

@ -2601,7 +2601,7 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
}
std::optional<MergeTreeBackgroundJob> StorageReplicatedMergeTree::getDataProcessingJob()
ThreadPool::Job StorageReplicatedMergeTree::getDataProcessingJob()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
@ -2615,43 +2615,12 @@ std::optional<MergeTreeBackgroundJob> StorageReplicatedMergeTree::getDataProcess
if (!entry)
return {};
auto job = [this, selected_entry{std::move(selected_entry)}] () mutable
return [this, selected_entry{std::move(selected_entry)}] () mutable
{
processQueueEntry(selected_entry);
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE);
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry();
LogEntryPtr & entry = selected_entry.first;
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
time_t prev_attempt_time = entry->last_attempt_time;
bool res = processQueueEntry(selected_entry);
/// We will go to sleep if the processing fails and if we have already processed this record recently.
bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);
/// If there was no exception, you do not need to sleep.
return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
}
bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
{
return queue.isVirtualPart(part);
@ -3576,10 +3545,6 @@ void StorageReplicatedMergeTree::shutdown()
queue.pull_log_blocker.cancelForever();
}
if (move_parts_task_handle)
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint)
{
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path));

View File

@ -196,7 +196,7 @@ public:
*/
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger);
std::optional<MergeTreeBackgroundJob> getDataProcessingJob() override;
ThreadPool::Job getDataProcessingJob() override;
private:
@ -423,15 +423,10 @@ private:
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry);
/** Performs actions from the queue.
*/
BackgroundProcessingPoolTaskResult queueTask();
/// Perform moves of parts to another disks.
/// Local operation, doesn't interact with replicationg queue.
BackgroundProcessingPoolTaskResult movePartsTask();
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.