From efd3126b5d7979da7ff79f380fa2ee46c2d54c36 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 14 Oct 2020 10:22:48 +0300 Subject: [PATCH] Moving pool --- .../MergeTree/BackgroundJobsExecutor.cpp | 34 +++++++++++++++- .../MergeTree/BackgroundJobsExecutor.h | 5 ++- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.h | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 40 ++++++++++++++----- src/Storages/MergeTree/MergeTreeData.h | 14 ++++--- .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 + src/Storages/MergeTree/MergeTreePartsMover.h | 2 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 11 +++-- src/Storages/StorageMergeTree.cpp | 21 +++------- src/Storages/StorageMergeTree.h | 5 +-- src/Storages/StorageReplicatedMergeTree.cpp | 24 +++-------- src/Storages/StorageReplicatedMergeTree.h | 7 ++-- 13 files changed, 99 insertions(+), 70 deletions(-) diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index 662fa71d318..081ef818dcf 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -1,12 +1,14 @@ #include +#include namespace DB { BackgroundJobsExecutor::BackgroundJobsExecutor( MergeTreeData & data_, - Context & global_context) + Context & global_context_) : data(data_) + , global_context(global_context_) , data_processing_pool(global_context.getSettingsRef().background_pool_size, 0, 10000, false) , move_pool(global_context.getSettingsRef().background_move_pool_size, 0, 10000, false) { @@ -14,6 +16,20 @@ BackgroundJobsExecutor::BackgroundJobsExecutor( data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); }); } +void BackgroundJobsExecutor::dataMovingTask() +try +{ + auto job = data.getDataMovingJob(); + if (job) + move_pool.scheduleOrThrowOnError(*job); + + data_moving_task->schedule(); +} +catch(...) +{ + tryLogCurrentException(__PRETTY_FUNCTION__); +} + void BackgroundJobsExecutor::dataProcessingTask() try { @@ -28,10 +44,21 @@ catch (...) tryLogCurrentException(__PRETTY_FUNCTION__); } +void BackgroundJobsExecutor::startMovingTaskIfNeeded() +{ + if (data.areBackgroundMovesNeeded() && !data_moving_task) + { + data_moving_task = global_context.getSchedulePool().createTask( + data.getStorageID().getFullTableName() + " (dataMovingTask)", [this]{ dataMovingTask(); }); + data_moving_task->activateAndSchedule(); + } +} + void BackgroundJobsExecutor::start() { if (data_processing_task) data_processing_task->activateAndSchedule(); + startMovingTaskIfNeeded(); } void BackgroundJobsExecutor::triggerDataProcessing() @@ -44,6 +71,11 @@ void BackgroundJobsExecutor::finish() { data_processing_task->deactivate(); data_processing_pool.wait(); + if (data_moving_task) + { + data_moving_task->deactivate(); + move_pool.wait(); + } } } diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h index aa166eb4d73..0945c4e0b59 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -12,19 +12,22 @@ class BackgroundJobsExecutor { private: MergeTreeData & data; + Context & global_context; ThreadPool data_processing_pool; ThreadPool move_pool; BackgroundSchedulePool::TaskHolder data_processing_task; - BackgroundSchedulePool::TaskHolder move_processing_task; + BackgroundSchedulePool::TaskHolder data_moving_task; void dataProcessingTask(); + void dataMovingTask(); public: BackgroundJobsExecutor( MergeTreeData & data_, Context & global_context_); + void startMovingTaskIfNeeded(); void triggerDataProcessing(); void triggerMovesProcessing(); void start(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 40f12428561..03817c70ac0 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -943,7 +943,7 @@ void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const Storag volume->getDisk()->removeIfExists(destination_path + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME); } -void IMergeTreeDataPart::makeCloneOnDiskDetached(const ReservationPtr & reservation) const +void IMergeTreeDataPart::makeCloneOnDiskDetached(const std::shared_ptr & reservation) const { assertOnDisk(); auto reserved_disk = reservation->getDisk(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 78daf6c9017..89136eaba4e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -317,7 +317,7 @@ public: virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const; /// Makes full clone of part in detached/ on another disk - void makeCloneOnDiskDetached(const ReservationPtr & reservation) const; + void makeCloneOnDiskDetached(const std::shared_ptr & reservation) const; /// Checks that .bin and .mrk files exist. /// diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0c1de756be..4b53ecba3c4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -71,6 +71,7 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric DelayedInserts; + extern const Metric BackgroundMovePoolTask; } @@ -3614,12 +3615,31 @@ bool MergeTreeData::selectPartsAndMove() return false; auto moving_tagger = selectPartsForMove(); - if (moving_tagger.parts_to_move.empty()) + if (moving_tagger->parts_to_move.empty()) return false; return moveParts(std::move(moving_tagger)); } +std::optional MergeTreeData::getDataMovingJob() +{ + if (parts_mover.moves_blocker.isCancelled()) + return {}; + + auto moving_tagger = selectPartsForMove(); + if (moving_tagger->parts_to_move.empty()) + return {}; + + auto job = [this, moving_tagger{std::move(moving_tagger)}] () mutable + { + moveParts(moving_tagger); + }; + + MergeTreeBackgroundJob result_job(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE); + + return std::make_optional(std::move(job), CurrentMetrics::BackgroundMovePoolTask, PoolType::MOVE); +} + bool MergeTreeData::areBackgroundMovesNeeded() const { auto policy = getStoragePolicy(); @@ -3636,13 +3656,13 @@ bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr spa return false; auto moving_tagger = checkPartsForMove(parts, space); - if (moving_tagger.parts_to_move.empty()) + if (moving_tagger->parts_to_move.empty()) return false; - return moveParts(std::move(moving_tagger)); + return moveParts(moving_tagger); } -MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove() +MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove() { MergeTreeMovingParts parts_to_move; @@ -3665,10 +3685,10 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove() std::lock_guard moving_lock(moving_parts_mutex); parts_mover.selectPartsForMove(parts_to_move, can_move, moving_lock); - return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this); + return std::make_shared(std::move(parts_to_move), *this); } -MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space) +MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(const DataPartsVector & parts, SpacePtr space) { std::lock_guard moving_lock(moving_parts_mutex); @@ -3693,14 +3713,14 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const parts_to_move.emplace_back(part, std::move(reservation)); } - return CurrentlyMovingPartsTagger(std::move(parts_to_move), *this); + return std::make_shared(std::move(parts_to_move), *this); } -bool MergeTreeData::moveParts(CurrentlyMovingPartsTagger && moving_tagger) +bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger) { - LOG_INFO(log, "Got {} parts to move.", moving_tagger.parts_to_move.size()); + LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size()); - for (const auto & moving_part : moving_tagger.parts_to_move) + for (const auto & moving_part : moving_tagger->parts_to_move) { Stopwatch stopwatch; DataPartPtr cloned_part; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 8c5333315fc..1ebe21e98af 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -711,7 +711,9 @@ public: /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; - virtual std::optional getDataProcessingJob() { return {}; } + virtual std::optional getDataProcessingJob() = 0; + std::optional getDataMovingJob(); + bool areBackgroundMovesNeeded() const; protected: @@ -889,7 +891,6 @@ protected: /// Selects parts for move and moves them, used in background process bool selectPartsAndMove(); - bool areBackgroundMovesNeeded() const; private: /// RAII Wrapper for atomic work with currently moving parts @@ -901,18 +902,19 @@ private: MergeTreeData & data; CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_); - CurrentlyMovingPartsTagger(const CurrentlyMovingPartsTagger & other) = delete; ~CurrentlyMovingPartsTagger(); }; + using CurrentlyMovingPartsTaggerPtr = std::shared_ptr; + /// Move selected parts to corresponding disks - bool moveParts(CurrentlyMovingPartsTagger && moving_tagger); + bool moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger); /// Select parts for move and disks for them. Used in background moving processes. - CurrentlyMovingPartsTagger selectPartsForMove(); + CurrentlyMovingPartsTaggerPtr selectPartsForMove(); /// Check selected parts for movements. Used by ALTER ... MOVE queries. - CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space); + CurrentlyMovingPartsTaggerPtr checkPartsForMove(const DataPartsVector & parts, SpacePtr space); bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason = nullptr) const; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 942bac0d294..9a0db253abf 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -154,6 +154,7 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si { } + UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const { size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); @@ -166,6 +167,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz { if (pool_used > pool_size) { + std::cerr << "POOLSIZE:" << pool_size << " POOL USED:" << pool_used << std::endl; throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); } diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index a1afadec7fa..332a0988d10 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -16,7 +16,7 @@ namespace DB struct MergeTreeMoveEntry { std::shared_ptr part; - ReservationPtr reserved_space; + std::shared_ptr reserved_space; MergeTreeMoveEntry(const std::shared_ptr & part_, ReservationPtr reservation_) : part(part_), reserved_space(std::move(reservation_)) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 48caf59e7ba..b7fd7097546 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -584,8 +584,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size()); } - if (storage.queue_task_handle) - storage.queue_task_handle->signalReadyToRun(); + storage.background_executor.triggerDataProcessing(); } return stat.version; @@ -668,8 +667,8 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C } } - if (some_active_mutations_were_killed && storage.queue_task_handle) - storage.queue_task_handle->signalReadyToRun(); + if (some_active_mutations_were_killed) + storage.background_executor.triggerDataProcessing(); if (!entries_to_load.empty()) { @@ -792,8 +791,8 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name); } - if (mutation_was_active && storage.queue_task_handle) - storage.queue_task_handle->signalReadyToRun(); + if (mutation_was_active) + storage.background_executor.triggerDataProcessing(); return entry; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 00ddb7a57ce..cfdd8a61e9c 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -106,7 +106,6 @@ void StorageMergeTree::startup() try { background_executor.start(); - startBackgroundMovesIfNeeded(); } catch (...) { @@ -145,9 +144,6 @@ void StorageMergeTree::shutdown() background_executor.finish(); - if (moving_task_handle) - global_context.getBackgroundMovePool().removeTask(moving_task_handle); - try { /// We clear all old parts after stopping all background operations. @@ -501,18 +497,6 @@ std::optional StorageMergeTree::getIncompleteMutationsS return result; } - -void StorageMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded() && !moving_task_handle) - { - auto & move_pool = global_context.getBackgroundMovePool(); - moving_task_handle = move_pool.createTask([this] { return movePartsTask(); }); - move_pool.startTask(moving_task_handle); - } -} - - std::vector StorageMergeTree::getMutationsStatus() const { std::lock_guard lock(currently_processing_in_background_mutex); @@ -1530,4 +1514,9 @@ MutationCommands StorageMergeTree::getFirtsAlterMutationCommandsForPart(const Da return it->second.commands; } +void StorageMergeTree::startBackgroundMovesIfNeeded() +{ + background_executor.startMovingTaskIfNeeded(); +} + } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index c028e15416f..957b7ce56a6 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -98,6 +98,7 @@ private: MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; + BackgroundJobsExecutor background_executor; /// For block numbers. SimpleIncrement increment{0}; @@ -120,10 +121,6 @@ private: std::atomic shutdown_called {false}; - /// Task handler for merges, mutations and moves. - BackgroundJobsExecutor background_executor; - BackgroundProcessingPool::TaskHandle moving_task_handle; - void loadMutations(); /** Determines what parts should be merged and merges it. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ae7ad4a3518..c61f2425b17 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -197,6 +197,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , merger_mutator(*this, global_context.getSettingsRef().background_pool_size) , queue(*this) , fetcher(*this) + , background_executor(*this, global_context) , cleanup_thread(*this) , part_check_thread(*this) , restarting_thread(*this) @@ -3530,12 +3531,9 @@ void StorageReplicatedMergeTree::startup() /// between the assignment of queue_task_handle and queueTask that use the queue_task_handle. { auto lock = queue.lockQueue(); - auto & pool = global_context.getBackgroundPool(); - queue_task_handle = pool.createTask([this] { return queueTask(); }); - pool.startTask(queue_task_handle); + background_executor.start(); } - startBackgroundMovesIfNeeded(); } catch (...) { @@ -3566,14 +3564,11 @@ void StorageReplicatedMergeTree::shutdown() restarting_thread.shutdown(); - if (queue_task_handle) - global_context.getBackgroundPool().removeTask(queue_task_handle); - { /// Queue can trigger queue_task_handle itself. So we ensure that all /// queue processes finished and after that reset queue_task_handle. auto lock = queue.lockQueue(); - queue_task_handle.reset(); + background_executor.finish(); /// Cancel logs pulling after background task were cancelled. It's still /// required because we can trigger pullLogsToQueue during manual OPTIMIZE, @@ -5921,12 +5916,9 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI { auto lock = queue.lockQueue(); - if (!queue_task_handle) - return false; - + background_executor.triggerDataProcessing(); /// This is significant, because the execution of this task could be delayed at BackgroundPool. /// And we force it to be executed. - queue_task_handle->signalReadyToRun(); } Poco::Event target_size_event; @@ -6032,15 +6024,9 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar return queue.getFirstAlterMutationCommandsForPart(part); } - void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() { - if (areBackgroundMovesNeeded() && !move_parts_task_handle) - { - auto & pool = global_context.getBackgroundMovePool(); - move_parts_task_handle = pool.createTask([this] { return movePartsTask(); }); - pool.startTask(move_parts_task_handle); - } + background_executor.startMovingTaskIfNeeded(); } } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 1b65ffdbc25..1a0cabae5fa 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -27,6 +27,7 @@ #include #include #include +#include namespace DB @@ -275,15 +276,14 @@ private: int metadata_version = 0; /// Threads. + BackgroundJobsExecutor background_executor; + /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue. bool queue_update_in_progress = false; BackgroundSchedulePool::TaskHolder queue_updating_task; BackgroundSchedulePool::TaskHolder mutations_updating_task; - /// A task that performs actions from the queue. - BackgroundProcessingPool::TaskHandle queue_task_handle; - /// A task which move parts to another disks/volumes /// Transparent for replication. BackgroundProcessingPool::TaskHandle move_parts_task_handle; @@ -568,7 +568,6 @@ private: MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override; void startBackgroundMovesIfNeeded() override; - protected: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. */