Moving pool

This commit is contained in:
alesapin 2020-10-14 10:22:48 +03:00
parent fd35368c59
commit efd3126b5d
13 changed files with 99 additions and 70 deletions

View File

@ -1,12 +1,14 @@
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
BackgroundJobsExecutor::BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context)
Context & global_context_)
: data(data_)
, global_context(global_context_)
, data_processing_pool(global_context.getSettingsRef().background_pool_size, 0, 10000, false)
, move_pool(global_context.getSettingsRef().background_move_pool_size, 0, 10000, false)
{
@ -14,6 +16,20 @@ BackgroundJobsExecutor::BackgroundJobsExecutor(
data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); });
}
void BackgroundJobsExecutor::dataMovingTask()
try
{
auto job = data.getDataMovingJob();
if (job)
move_pool.scheduleOrThrowOnError(*job);
data_moving_task->schedule();
}
catch(...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
void BackgroundJobsExecutor::dataProcessingTask()
try
{
@ -28,10 +44,21 @@ catch (...)
tryLogCurrentException(__PRETTY_FUNCTION__);
}
void BackgroundJobsExecutor::startMovingTaskIfNeeded()
{
if (data.areBackgroundMovesNeeded() && !data_moving_task)
{
data_moving_task = global_context.getSchedulePool().createTask(
data.getStorageID().getFullTableName() + " (dataMovingTask)", [this]{ dataMovingTask(); });
data_moving_task->activateAndSchedule();
}
}
void BackgroundJobsExecutor::start()
{
if (data_processing_task)
data_processing_task->activateAndSchedule();
startMovingTaskIfNeeded();
}
void BackgroundJobsExecutor::triggerDataProcessing()
@ -44,6 +71,11 @@ void BackgroundJobsExecutor::finish()
{
data_processing_task->deactivate();
data_processing_pool.wait();
if (data_moving_task)
{
data_moving_task->deactivate();
move_pool.wait();
}
}
}

View File

@ -12,19 +12,22 @@ class BackgroundJobsExecutor
{
private:
MergeTreeData & data;
Context & global_context;
ThreadPool data_processing_pool;
ThreadPool move_pool;
BackgroundSchedulePool::TaskHolder data_processing_task;
BackgroundSchedulePool::TaskHolder move_processing_task;
BackgroundSchedulePool::TaskHolder data_moving_task;
void dataProcessingTask();
void dataMovingTask();
public:
BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context_);
void startMovingTaskIfNeeded();
void triggerDataProcessing();
void triggerMovesProcessing();
void start();

View File

@ -943,7 +943,7 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const Storag
volume->getDisk()->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME);
}
void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const
void IMergeTreeDataPart::makeCloneOnDiskDetached(const std::shared_ptr<IReservation> & reservation) const
{
assertOnDisk();
auto reserved_disk = reservation->getDisk();

View File

@ -317,7 +317,7 @@ public:
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
/// Makes full clone of part in detached/ on another disk
void makeCloneOnDiskDetached(const ReservationPtr & reservation) const;
void makeCloneOnDiskDetached(const std::shared_ptr<IReservation> & reservation) const;
/// Checks that .bin and .mrk files exist.
///

View File

@ -71,6 +71,7 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric DelayedInserts;
extern const Metric BackgroundMovePoolTask;
}
@ -3614,12 +3615,31 @@ bool MergeTreeData::selectPartsAndMove()
return false;
auto moving_tagger = selectPartsForMove();
if (moving_tagger.parts_to_move.empty())
if (moving_tagger->parts_to_move.empty())
return false;
return moveParts(std::move(moving_tagger));
}
std::optional<MergeTreeBackgroundJob> MergeTreeData::getDataMovingJob()
{
if (parts_mover.moves_blocker.isCancelled())
return {};
auto moving_tagger = selectPartsForMove();
if (moving_tagger->parts_to_move.empty())
return {};
auto job = [this, moving_tagger{std::move(moving_tagger)}] () mutable
{
moveParts(moving_tagger);
};
MergeTreeBackgroundJob result_job(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE);
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE);
}
bool MergeTreeData::areBackgroundMovesNeeded() const
{
auto policy = getStoragePolicy();
@ -3636,13 +3656,13 @@ bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr spa
return false;
auto moving_tagger = checkPartsForMove(parts, space);
if (moving_tagger.parts_to_move.empty())
if (moving_tagger->parts_to_move.empty())
return false;
return moveParts(std::move(moving_tagger));
return moveParts(moving_tagger);
}
MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
{
MergeTreeMovingParts parts_to_move;
@ -3665,10 +3685,10 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
std::lock_guard moving_lock(moving_parts_mutex);
parts_mover.selectPartsForMove(parts_to_move, can_move, moving_lock);
return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
}
MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space)
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space)
{
std::lock_guard moving_lock(moving_parts_mutex);
@ -3693,14 +3713,14 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
parts_to_move.emplace_back(part, std::move(reservation));
}
return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this);
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
}
bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger)
bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger)
{
LOG_INFO(log, "Got {} parts to move.", moving_tagger.parts_to_move.size());
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
for (const auto & moving_part : moving_tagger.parts_to_move)
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;

View File

@ -711,7 +711,9 @@ public:
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
virtual std::optional<MergeTreeBackgroundJob> getDataProcessingJob() { return {}; }
virtual std::optional<MergeTreeBackgroundJob> getDataProcessingJob() = 0;
std::optional<MergeTreeBackgroundJob> getDataMovingJob();
bool areBackgroundMovesNeeded() const;
protected:
@ -889,7 +891,6 @@ protected:
/// Selects parts for move and moves them, used in background process
bool selectPartsAndMove();
bool areBackgroundMovesNeeded() const;
private:
/// RAII Wrapper for atomic work with currently moving parts
@ -901,18 +902,19 @@ private:
MergeTreeData & data;
CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_);
CurrentlyMovingPartsTagger(const CurrentlyMovingPartsTagger & other) = delete;
~CurrentlyMovingPartsTagger();
};
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
/// Move selected parts to corresponding disks
bool moveParts(CurrentlyMovingPartsTagger && moving_tagger);
bool moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger);
/// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTagger selectPartsForMove();
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
/// Check selected parts for movements. Used by ALTER ... MOVE queries.
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
CurrentlyMovingPartsTaggerPtr checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason = nullptr) const;

View File

@ -154,6 +154,7 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si
{
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const
{
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
@ -166,6 +167,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
{
if (pool_used > pool_size)
{
std::cerr << "POOLSIZE:" << pool_size << " POOL USED:" << pool_used << std::endl;
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -16,7 +16,7 @@ namespace DB
struct MergeTreeMoveEntry
{
std::shared_ptr<const IMergeTreeDataPart> part;
ReservationPtr reserved_space;
std::shared_ptr<IReservation> reserved_space;
MergeTreeMoveEntry(const std::shared_ptr<const IMergeTreeDataPart> & part_, ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))

View File

@ -584,8 +584,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size());
}
if (storage.queue_task_handle)
storage.queue_task_handle->signalReadyToRun();
storage.background_executor.triggerDataProcessing();
}
return stat.version;
@ -668,8 +667,8 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
}
if (some_active_mutations_were_killed && storage.queue_task_handle)
storage.queue_task_handle->signalReadyToRun();
if (some_active_mutations_were_killed)
storage.background_executor.triggerDataProcessing();
if (!entries_to_load.empty())
{
@ -792,8 +791,8 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
if (mutation_was_active && storage.queue_task_handle)
storage.queue_task_handle->signalReadyToRun();
if (mutation_was_active)
storage.background_executor.triggerDataProcessing();
return entry;
}

View File

@ -106,7 +106,6 @@ void StorageMergeTree::startup()
try
{
background_executor.start();
startBackgroundMovesIfNeeded();
}
catch (...)
{
@ -145,9 +144,6 @@ void StorageMergeTree::shutdown()
background_executor.finish();
if (moving_task_handle)
global_context.getBackgroundMovePool().removeTask(moving_task_handle);
try
{
/// We clear all old parts after stopping all background operations.
@ -501,18 +497,6 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
return result;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !moving_task_handle)
{
auto & move_pool = global_context.getBackgroundMovePool();
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
move_pool.startTask(moving_task_handle);
}
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -1530,4 +1514,9 @@ MutationCommands StorageMergeTree::getFirtsAlterMutationCommandsForPart(const Da
return it->second.commands;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
background_executor.startMovingTaskIfNeeded();
}
}

View File

@ -98,6 +98,7 @@ private:
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
BackgroundJobsExecutor background_executor;
/// For block numbers.
SimpleIncrement increment{0};
@ -120,10 +121,6 @@ private:
std::atomic<bool> shutdown_called {false};
/// Task handler for merges, mutations and moves.
BackgroundJobsExecutor background_executor;
BackgroundProcessingPool::TaskHandle moving_task_handle;
void loadMutations();
/** Determines what parts should be merged and merges it.

View File

@ -197,6 +197,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
, queue(*this)
, fetcher(*this)
, background_executor(*this, global_context)
, cleanup_thread(*this)
, part_check_thread(*this)
, restarting_thread(*this)
@ -3530,12 +3531,9 @@ void StorageReplicatedMergeTree::startup()
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
{
auto lock = queue.lockQueue();
auto & pool = global_context.getBackgroundPool();
queue_task_handle = pool.createTask([this] { return queueTask(); });
pool.startTask(queue_task_handle);
background_executor.start();
}
startBackgroundMovesIfNeeded();
}
catch (...)
{
@ -3566,14 +3564,11 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.shutdown();
if (queue_task_handle)
global_context.getBackgroundPool().removeTask(queue_task_handle);
{
/// Queue can trigger queue_task_handle itself. So we ensure that all
/// queue processes finished and after that reset queue_task_handle.
auto lock = queue.lockQueue();
queue_task_handle.reset();
background_executor.finish();
/// Cancel logs pulling after background task were cancelled. It's still
/// required because we can trigger pullLogsToQueue during manual OPTIMIZE,
@ -5921,12 +5916,9 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
{
auto lock = queue.lockQueue();
if (!queue_task_handle)
return false;
background_executor.triggerDataProcessing();
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
queue_task_handle->signalReadyToRun();
}
Poco::Event target_size_event;
@ -6032,15 +6024,9 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar
return queue.getFirstAlterMutationCommandsForPart(part);
}
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded() && !move_parts_task_handle)
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
background_executor.startMovingTaskIfNeeded();
}
}

View File

@ -27,6 +27,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <Processors/Pipe.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
namespace DB
@ -275,15 +276,14 @@ private:
int metadata_version = 0;
/// Threads.
BackgroundJobsExecutor background_executor;
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// A task which move parts to another disks/volumes
/// Transparent for replication.
BackgroundProcessingPool::TaskHandle move_parts_task_handle;
@ -568,7 +568,6 @@ private:
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/