From f576cbb8a4c2a6399ce1c8016a4319b1017c4936 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 5 Sep 2019 16:12:29 +0300 Subject: [PATCH] Simplify code --- .../MergeTree/MergeTreeBlockOutputStream.cpp | 4 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 147 ++++++++++++++- dbms/src/Storages/MergeTree/MergeTreeData.h | 20 ++- .../MergeTree/MergeTreePartsMover.cpp | 10 +- .../Storages/MergeTree/MergeTreePartsMover.h | 59 ++----- dbms/src/Storages/StorageMergeTree.cpp | 167 +++--------------- dbms/src/Storages/StorageMergeTree.h | 19 +- .../Storages/StorageReplicatedMergeTree.cpp | 156 ++-------------- .../src/Storages/StorageReplicatedMergeTree.h | 16 +- .../integration/test_multiple_disks/test.py | 2 +- 10 files changed, 232 insertions(+), 368 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 6aa9b07df73..be3caf98ad4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -27,8 +27,8 @@ void MergeTreeBlockOutputStream::write(const Block & block) PartLog::addNewPart(storage.global_context, part, watch.elapsed()); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. - if (storage.background_task_handle) - storage.background_task_handle->wake(); + if (storage.merging_mutating_task_handle) + storage.merging_mutating_task_handle->wake(); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 6155a90a484..874596f8c08 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -92,6 +92,7 @@ namespace ErrorCodes extern const int BAD_DATA_PART_NAME; extern const int UNKNOWN_SETTING; extern const int READONLY_SETTING; + extern const int ABORTED; } @@ -128,6 +129,7 @@ MergeTreeData::MergeTreeData( , storage_policy(context_.getStoragePolicy(getSettings()->storage_policy_name)) , data_parts_by_info(data_parts_indexes.get()) , data_parts_by_state_and_info(data_parts_indexes.get()) + , parts_mover(this) { const auto settings = getSettings(); setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_); @@ -2723,7 +2725,8 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String & throw Exception("Part " + part->name + " already on disk " + name, ErrorCodes::UNKNOWN_DISK); } - movePartsToSpace(parts, std::static_pointer_cast(disk)); + if (!movePartsToSpace(&parts, std::static_pointer_cast(disk))) + throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED); } @@ -2756,7 +2759,8 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String if (part->disk->getName() == disk->getName()) throw Exception("Part " + part->name + " already on volume " + name, ErrorCodes::UNKNOWN_DISK); - movePartsToSpace(parts, std::static_pointer_cast(volume)); + if (!movePartsToSpace(&parts, std::static_pointer_cast(volume))) + throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED); } @@ -3383,5 +3387,144 @@ catch (...) tryLogCurrentException(log, __PRETTY_FUNCTION__); } +namespace +{ + +struct MovingPartsTagger +{ + MergeTreeMovingParts parts_to_move; + std::unique_lock background_lock; + MergeTreeData::DataParts & all_moving_parts; + + MovingPartsTagger(MergeTreeMovingParts && moving_parts_, + std::unique_lock && background_lock_, + MergeTreeData::DataParts & all_moving_data_parts_) + : parts_to_move(std::move(moving_parts_)) + , background_lock(std::move(background_lock_)) + , all_moving_parts(all_moving_data_parts_) + { + if (!background_lock) + throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR); + + for (const auto & moving_part : parts_to_move) + if (!all_moving_parts.emplace(moving_part.part).second) + throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR); + + background_lock.unlock(); + } + + ~MovingPartsTagger() + { + background_lock.lock(); + for (const auto & moving_part : parts_to_move) + { + /// Something went completely wrong + if (!all_moving_parts.count(moving_part.part)) + std::terminate(); + all_moving_parts.erase(moving_part.part); + } + + } +}; + +} + +bool MergeTreeData::movePartsToSpace(const DataPartsVector * parts, DiskSpace::SpacePtr space) +{ + if (parts_mover.moves_blocker.isCancelled()) + return false; + + auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); + + std::optional moving_tagger; + { + MergeTreeMovingParts parts_to_move; + std::unique_lock moving_parts_lock(moving_parts_mutex); + + if (parts != nullptr) + { + for (const auto & part : *parts) + { + auto reservation = space->reserve(part->bytes_on_disk); + if (!reservation) + throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE); + + auto & reserved_disk = reservation->getDisk(); + String path_to_clone = getFullPathOnDisk(reserved_disk); + + if (Poco::File(path_to_clone + part->name).exists()) + throw Exception( + "Move is not possible: " + path_to_clone + part->name + " already exists.", + ErrorCodes::DIRECTORY_ALREADY_EXISTS); + + if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part)) + throw Exception( + "Cannot move part '" + part->name + "' because it's participating in background process.", + ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + + parts_to_move.emplace_back(part, std::move(reservation)); + } + } + else + { + auto can_move = [this](const DataPartPtr & part, String * reason) -> bool + { + if (partIsAssignedToBackgroundOperation(part)) + { + *reason = "part already assigned to replicated background operation."; + return false; + } + if (currently_moving_parts.count(part)) + { + *reason = "part is already moving."; + return false; + } + + return true; + }; + + if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) + return false; + } + + LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); + moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts); + } + + + for (const auto & moving_part : moving_tagger->parts_to_move) + { + Stopwatch stopwatch; + DataPartPtr cloned_part; + + auto write_part_log = [&](const ExecutionStatus & execution_status) + { + writePartLog( + PartLogElement::Type::MOVE_PART, + execution_status, + stopwatch.elapsed(), + moving_part.part->name, + cloned_part, + {moving_part.part}, + nullptr); + }; + + try + { + cloned_part = parts_mover.clonePart(moving_part); + parts_mover.swapClonedPart(cloned_part); + write_part_log({}); + } + catch (...) + { + write_part_log(ExecutionStatus::fromCurrentException()); + if (cloned_part) + cloned_part->remove(); + + throw; + } + } + return true; +} } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 26fd04abf51..9a0d708d2a5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -31,6 +32,7 @@ namespace DB class MergeListEntry; class AlterCommands; +class MergeTreePartsMover; namespace ErrorCodes { @@ -597,7 +599,11 @@ public: protected: /// Moves part to specified space - virtual void movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space) = 0; + bool movePartsToSpace( + const DataPartsVector * parts = nullptr, + DiskSpace::SpacePtr space = nullptr); + + virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0; public: /// Moves partition to specified Disk @@ -726,6 +732,16 @@ public: bool has_non_adaptive_index_granularity_parts = false; + /// Parts that currently moving from disk/volume to another. + /// This set have to be used with `currently_processing_in_background_mutex`. + /// Moving may conflict with merges and mutations, but this is OK, because + /// if we decide to move some part to another disk, than we + /// assuredly will choose this disk for containing part, which will appear + /// as result of merge or mutation. + DataParts currently_moving_parts; + + /// Mutex for currenly_moving_parts + std::mutex moving_parts_mutex; protected: @@ -799,6 +815,8 @@ protected: DataPartsIndexes::index::type & data_parts_by_info; DataPartsIndexes::index::type & data_parts_by_state_and_info; + MergeTreePartsMover parts_mover; + using DataPartIteratorByInfo = DataPartsIndexes::index::type::iterator; using DataPartIteratorByStateAndInfo = DataPartsIndexes::index::type::iterator; diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index 8981a78ef25..b19e5f186e3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -73,13 +73,13 @@ bool MergeTreePartsMover::selectPartsToMove( MergeTreeMovingParts & parts_to_move, const AllowedMovingPredicate & can_move) { - MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); + MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector(); if (data_parts.empty()) return false; std::unordered_map need_to_move; - const auto & policy = data.getStoragePolicy(); + const auto & policy = data->getStoragePolicy(); const auto & volumes = policy->getVolumes(); /// Do not check if policy has one volume @@ -146,7 +146,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space); MergeTreeData::MutableDataPartPtr cloned_part = - std::make_shared(data, moving_part.reserved_space->getDisk(), moving_part.part->name); + std::make_shared(*data, moving_part.reserved_space->getDisk(), moving_part.part->name); cloned_part->relative_path = "detached/" + moving_part.part->name; LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath()); @@ -161,7 +161,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon if (moves_blocker.isCancelled()) throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED); - auto active_part = data.getActiveContainingPart(cloned_part->name); + auto active_part = data->getActiveContainingPart(cloned_part->name); if (!active_part || active_part->name != cloned_part->name) throw Exception("Failed to swap " + cloned_part->name + ". Active part doesn't exist." @@ -170,7 +170,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon cloned_part->renameTo(active_part->name); - data.swapActivePart(cloned_part); + data->swapActivePart(cloned_part); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.h b/dbms/src/Storages/MergeTree/MergeTreePartsMover.h index 6cf1264f414..66acff7c5fb 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.h @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -18,61 +17,27 @@ namespace ErrorCodes struct MergeTreeMoveEntry { - MergeTreeData::DataPartPtr part; + std::shared_ptr part; DiskSpace::ReservationPtr reserved_space; - MergeTreeMoveEntry(const MergeTreeData::DataPartPtr & part_, DiskSpace::ReservationPtr reservation_) - : part(part_), - reserved_space(std::move(reservation_)) + MergeTreeMoveEntry(const std::shared_ptr & part_, DiskSpace::ReservationPtr reservation_) + : part(part_), reserved_space(std::move(reservation_)) { } }; using MergeTreeMovingParts = std::vector; -struct MovingPartsTagger -{ - MergeTreeMovingParts parts_to_move; - std::unique_lock background_lock; - MergeTreeData::DataParts & all_moving_parts; - - - MovingPartsTagger(MergeTreeMovingParts && moving_parts_, - std::unique_lock && background_lock_, - MergeTreeData::DataParts & all_moving_data_parts_) - : parts_to_move(std::move(moving_parts_)) - , background_lock(std::move(background_lock_)) - , all_moving_parts(all_moving_data_parts_) - { - if (!background_lock) - throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR); - - for (const auto & moving_part : parts_to_move) - if (!all_moving_parts.emplace(moving_part.part).second) - throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR); - - background_lock.unlock(); - } - - ~MovingPartsTagger() - { - background_lock.lock(); - for (const auto & moving_part : parts_to_move) - { - /// Something went completely wrong - if (!all_moving_parts.count(moving_part.part)) - std::terminate(); - all_moving_parts.erase(moving_part.part); - } - - } -}; class MergeTreePartsMover + + { - using AllowedMovingPredicate = std::function; +private: + using AllowedMovingPredicate = std::function &, String * reason)>; + public: - MergeTreePartsMover(MergeTreeData & data_) + MergeTreePartsMover(MergeTreeData * data_) : data(data_) , log(&Poco::Logger::get("MergeTreePartsMover")) { @@ -82,16 +47,16 @@ public: MergeTreeMovingParts & parts_to_move, const AllowedMovingPredicate & can_move); - MergeTreeData::DataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const; + std::shared_ptr clonePart(const MergeTreeMoveEntry & moving_part) const; - void swapClonedPart(const MergeTreeData::DataPartPtr & cloned_parts) const; + void swapClonedPart(const std::shared_ptr & cloned_parts) const; public: ActionBlocker moves_blocker; private: - MergeTreeData & data; + MergeTreeData * data; Logger * log; }; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 8388be3fedb..74668b392d1 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -73,10 +73,8 @@ StorageMergeTree::StorageMergeTree( context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, ttl_table_ast_, merging_params_, std::move(storage_settings_), false, attach), - background_pool(context_.getBackgroundPool()), reader(*this), writer(*this), - merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()), - parts_mover(*this) + merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()) { loadDataParts(has_force_restore_data_flag); @@ -99,7 +97,8 @@ void StorageMergeTree::startup() /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup.restart(); - background_task_handle = background_pool.addTask([this] { return backgroundTask(); }); + merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); }); + moving_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); }); } @@ -111,8 +110,11 @@ void StorageMergeTree::shutdown() merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); - if (background_task_handle) - background_pool.removeTask(background_task_handle); + if (merging_mutating_task_handle) + global_context.getBackgroundPool().removeTask(merging_mutating_task_handle); + + if (moving_task_handle) + global_context.getBackgroundPool().removeTask(moving_task_handle); } @@ -421,7 +423,7 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context & } LOG_INFO(log, "Added mutation: " << file_name); - background_task_handle->wake(); + merging_mutating_task_handle->wake(); } std::vector StorageMergeTree::getMutationsStatus() const @@ -492,7 +494,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id); /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately. - background_task_handle->wake(); + merging_mutating_task_handle->wake(); return CancellationCode::CancelSent; } @@ -617,139 +619,28 @@ bool StorageMergeTree::merge( return true; } -void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space) + +bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const { - auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - - std::optional moving_tagger; - { - MergeTreeMovingParts parts_to_move; - std::unique_lock background_processing_lock(currently_processing_in_background_mutex); - - for (const auto & part : parts) - { - auto reservation = space->reserve(part->bytes_on_disk); - if (!reservation) - throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE); - - auto & reserved_disk = reservation->getDisk(); - String path_to_clone = getFullPathOnDisk(reserved_disk); - - if (Poco::File(path_to_clone + part->name).exists()) - throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.", - ErrorCodes::DIRECTORY_ALREADY_EXISTS); - - if (currently_merging_mutating_parts.count(part)) - throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.", - ErrorCodes::PART_IS_TEMPORARILY_LOCKED); - - parts_to_move.emplace_back(part, std::move(reservation)); - } - - moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts); - } - - for (const auto & moving_part : moving_tagger->parts_to_move) - { - Stopwatch stopwatch; - DataPartPtr cloned_part; - - auto write_part_log = [&](const ExecutionStatus & execution_status) - { - writePartLog( - PartLogElement::Type::MOVE_PART, - execution_status, - stopwatch.elapsed(), - moving_part.part->name, - cloned_part, - {moving_part.part}, - nullptr); - }; - - try - { - cloned_part = parts_mover.clonePart(moving_part); - parts_mover.swapClonedPart(cloned_part); - write_part_log({}); - } - catch (...) - { - write_part_log(ExecutionStatus::fromCurrentException()); - if (cloned_part) - cloned_part->remove(); - throw; - } - } + std::lock_guard background_processing_lock(currently_processing_in_background_mutex); + return currently_merging_mutating_parts.count(part); } -bool StorageMergeTree::moveParts() +BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() { - auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - - std::optional moving_tagger; + try { - MergeTreeMovingParts parts_to_move; - std::unique_lock background_processing_lock(currently_processing_in_background_mutex); + if (!movePartsToSpace()) + return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - auto can_move = [this](const DataPartPtr & part, String * reason) -> bool - { - - if (currently_merging_mutating_parts.count(part)) - { - *reason = "part is already assigned to merge or mutation."; - return false; - } - - if (currently_moving_parts.count(part)) - { - *reason = "part is already moving."; - return false; - } - - return true; - }; - - if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) - return false; - - LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); - moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts); + return BackgroundProcessingPoolTaskResult::SUCCESS; } - - for (const auto & moving_part : moving_tagger->parts_to_move) + catch (...) { - Stopwatch stopwatch; - DataPartPtr cloned_part; - - auto write_part_log = [&](const ExecutionStatus & execution_status) - { - writePartLog( - PartLogElement::Type::MOVE_PART, - execution_status, - stopwatch.elapsed(), - moving_part.part->name, - cloned_part, - {moving_part.part}, - nullptr); - }; - - try - { - cloned_part = parts_mover.clonePart(moving_part); - parts_mover.swapClonedPart(cloned_part); - write_part_log({}); - } - catch (...) - { - write_part_log(ExecutionStatus::fromCurrentException()); - if (cloned_part) - cloned_part->remove(); - return false; - } + tryLogCurrentException(log); + return BackgroundProcessingPoolTaskResult::ERROR; } - - return true; } @@ -849,15 +740,12 @@ bool StorageMergeTree::tryMutatePart() } -BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask() +BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() { if (shutdown_called) return BackgroundProcessingPoolTaskResult::ERROR; - bool merges_mutations_blocked = merger_mutator.merges_blocker.isCancelled(); - bool moves_blocked = parts_mover.moves_blocker.isCancelled(); - - if (merges_mutations_blocked && moves_blocked) + if (merger_mutator.merges_blocker.isCancelled()) return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; try @@ -875,14 +763,11 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask() } ///TODO: read deduplicate option from table config - if (!merges_mutations_blocked && merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) - return BackgroundProcessingPoolTaskResult::SUCCESS; - - if (!moves_blocked && moveParts()) + if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) return BackgroundProcessingPoolTaskResult::SUCCESS; - if (!merges_mutations_blocked && tryMutatePart()) + if (tryMutatePart()) return BackgroundProcessingPoolTaskResult::SUCCESS; return BackgroundProcessingPoolTaskResult::ERROR; diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 57186a2b008..15080cfcbf8 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -71,12 +71,10 @@ public: CheckResults checkData(const ASTPtr & query, const Context & context) override; private: - BackgroundProcessingPool & background_pool; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; - MergeTreePartsMover parts_mover; /// For block numbers. SimpleIncrement increment{0}; @@ -92,13 +90,6 @@ private: /// This set have to be used with `currently_processing_in_background_mutex`. DataParts currently_merging_mutating_parts; - /// Parts that currently moving from disk/volume to another. - /// This set have to be used with `currently_processing_in_background_mutex`. - /// Moving may conflict with merges and mutations, but this is OK, because - /// if we decide to move some part to another disk, than we - /// assuredly will choose this disk for containing part, which will appear - /// as result of merge or mutation. - DataParts currently_moving_parts; std::map current_mutations_by_id; std::multimap current_mutations_by_version; @@ -106,7 +97,8 @@ private: std::atomic shutdown_called {false}; /// Task handler for merges, mutations and moves. - BackgroundProcessingPool::TaskHandle background_task_handle; + BackgroundProcessingPool::TaskHandle merging_mutating_task_handle; + BackgroundProcessingPool::TaskHandle moving_task_handle; std::vector prepareAlterTransactions( const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context); @@ -119,12 +111,12 @@ private: */ bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); - bool moveParts(); + BackgroundProcessingPoolTaskResult movePartsTask(); /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. bool tryMutatePart(); - BackgroundProcessingPoolTaskResult backgroundTask(); + BackgroundProcessingPoolTaskResult mergeMutateTask(); Int64 getCurrentMutationVersion( const DataPartPtr & part, @@ -137,7 +129,8 @@ private: void clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context); void attachPartition(const ASTPtr & partition, bool part, const Context & context); void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); - void movePartsToSpace(const MergeTreeData::DataPartsVector & part, DiskSpace::SpacePtr space) override; + bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; + friend class MergeTreeBlockOutputStream; friend class MergeTreeData; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index ffb67af8e0c..0452e629b1d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -214,7 +214,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)), replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)), reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()), - parts_mover(*this), queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this), + queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), restarting_thread(*this) { if (!zookeeper_path.empty() && zookeeper_path.back() == '/') @@ -2147,159 +2147,29 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS; } -BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movingPartsTask() + +bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const { - if (parts_mover.moves_blocker.isCancelled()) - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - - auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); + return queue.isPartAssignedToBackgroundOperation(part); +} +BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movePartsTask() +{ try { - std::optional moving_tagger; - { - MergeTreeMovingParts parts_to_move; - std::unique_lock moving_parts_lock(moving_parts_mutex); - - auto can_move = [this](const DataPartPtr & part, String * reason) -> bool - { - if (queue.isPartAssignedToBackgroundOperation(part)) - { - *reason = "part already assigned to replicated background operation."; - return false; - } - if (currently_moving_parts.count(part)) - { - *reason = "part is already moving."; - return false; - } - - return true; - }; - - if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - - LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); - moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts); - } - - - for (const auto & moving_part : moving_tagger->parts_to_move) - { - Stopwatch stopwatch; - DataPartPtr cloned_part; - - auto write_part_log = [&](const ExecutionStatus & execution_status) - { - writePartLog( - PartLogElement::Type::MOVE_PART, - execution_status, - stopwatch.elapsed(), - moving_part.part->name, - cloned_part, - {moving_part.part}, - nullptr); - }; - - try - { - cloned_part = parts_mover.clonePart(moving_part); - parts_mover.swapClonedPart(cloned_part); - write_part_log({}); - } - catch (...) - { - write_part_log(ExecutionStatus::fromCurrentException()); - if (cloned_part) - cloned_part->remove(); - - return BackgroundProcessingPoolTaskResult::ERROR; - } - } + if (!movePartsToSpace()) + return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; return BackgroundProcessingPoolTaskResult::SUCCESS; } - catch (const Exception & e) + catch (...) { - if (e.code() == ErrorCodes::ABORTED) - { - LOG_INFO(log, e.message()); - return BackgroundProcessingPoolTaskResult::ERROR; - } - - throw; + tryLogCurrentException(log); + return BackgroundProcessingPoolTaskResult::ERROR; } } -void StorageReplicatedMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space) -{ - auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - - std::optional moving_tagger; - { - MergeTreeMovingParts parts_to_move; - std::unique_lock moving_parts_lock(moving_parts_mutex); - for (const auto & part : parts) - { - auto reservation = space->reserve(part->bytes_on_disk); - if (!reservation) - throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE); - - auto & reserved_disk = reservation->getDisk(); - String path_to_clone = getFullPathOnDisk(reserved_disk); - - if (Poco::File(path_to_clone + part->name).exists()) - throw Exception( - "Move is not possible: " + path_to_clone + part->name + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - - if (queue.isPartAssignedToBackgroundOperation(part)) - throw Exception( - "Cannot move part '" + part->name + "' because it's participating in background process.", - ErrorCodes::PART_IS_TEMPORARILY_LOCKED); - - parts_to_move.emplace_back(part, std::move(reservation)); - } - LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); - moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts); - } - - for (const auto & moving_part : moving_tagger->parts_to_move) - { - Stopwatch stopwatch; - DataPartPtr cloned_part; - - auto write_part_log = [&](const ExecutionStatus & execution_status) - { - writePartLog( - PartLogElement::Type::MOVE_PART, - execution_status, - stopwatch.elapsed(), - moving_part.part->name, - cloned_part, - {moving_part.part}, - nullptr); - }; - - try - { - cloned_part = parts_mover.clonePart(moving_part); - parts_mover.swapClonedPart(cloned_part); - write_part_log({}); - } - catch (...) - { - write_part_log(ExecutionStatus::fromCurrentException()); - if (cloned_part) - cloned_part->remove(); - - throw; - } - } - -} - void StorageReplicatedMergeTree::mergeSelectingTask() { if (!is_leader) @@ -3002,7 +2872,7 @@ void StorageReplicatedMergeTree::startup() data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler()); queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); }); - move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movingPartsTask(); }); + move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); }); /// In this thread replica will be activated. restarting_thread.start(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 7ba1c222a93..facdb1660f0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -229,7 +229,6 @@ private: MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; - MergeTreePartsMover parts_mover; /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). * In ZK entries in chronological order. Here it is not necessary. @@ -338,12 +337,9 @@ private: DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const DataPartPtr & part); - void movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space) override; + bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; - void getCommitPartOps( - Coordination::Requests & ops, - MutableDataPartPtr & part, - const String & block_id_path = "") const; + void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const; /// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful. void updatePartHeaderInZooKeeperAndCommit( @@ -406,7 +402,7 @@ private: /// Perform moves of parts to another disks. /// Local operation, doesn't interact with replicationg queue. - BackgroundProcessingPoolTaskResult movingPartsTask(); + BackgroundProcessingPoolTaskResult movePartsTask(); /// Postcondition: @@ -465,12 +461,6 @@ private: std::unordered_set currently_fetching_parts; std::mutex currently_fetching_parts_mutex; - /// Parts currently moving to another disks or volumes. - /// This operation doesn't replicate. - DataParts currently_moving_parts; - - /// Mutex for currenly_moving_parts - std::mutex moving_parts_mutex; /// With the quorum being tracked, add a replica to the quorum for the part. void updateQuorum(const String & part_name); diff --git a/dbms/tests/integration/test_multiple_disks/test.py b/dbms/tests/integration/test_multiple_disks/test.py index 5119f439170..5b7c31e8856 100644 --- a/dbms/tests/integration/test_multiple_disks/test.py +++ b/dbms/tests/integration/test_multiple_disks/test.py @@ -495,7 +495,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine): node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name)) - retry = 10 + retry = 20 while node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != "" and retry > 0: retry -= 1 time.sleep(0.5)