diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index d891f94b866..215715059f6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -6,15 +6,21 @@ namespace DB { +namespace ErrorCodes +{ + extern const int ABORTED; +} + namespace { + /// Contains minimal number of heaviest parts, which sum size on disk is greater than required. /// If there are not enough summary size, than contains all. class LargestPartsWithRequiredSize { struct PartsSizeOnDiskComparator { - bool operator() (const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const + bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const { return f->bytes_on_disk < s->bytes_on_disk; } @@ -25,10 +31,7 @@ class LargestPartsWithRequiredSize UInt64 current_size_sum = 0; public: - LargestPartsWithRequiredSize(UInt64 required_sum_size_) - : required_size_sum(required_sum_size_) - { - } + LargestPartsWithRequiredSize(UInt64 required_sum_size_) : required_size_sum(required_sum_size_) {} void add(MergeTreeData::DataPartPtr part) { @@ -100,8 +103,13 @@ bool MergeTreePartsMover::selectPartsToMove( for (const auto & part : data_parts) { - if (!can_move(part, nullptr)) + String reason; + if (!can_move(part, &reason)) + { + LOG_TRACE(log, "Cannot select part '" << part->name << "' to move, becase " << reason); continue; + } + auto to_insert = need_to_move.find(part->disk); if (to_insert != need_to_move.end()) to_insert->second.add(part); @@ -132,6 +140,15 @@ MergeTreeData::DataPartsVector MergeTreePartsMover::cloneParts(const MergeTreeMo MergeTreeData::DataPartsVector res; for (auto && move : parts) { + if (moves_blocker.isCancelled()) + { + /// Removing all copied parts from disk + for (auto & part : res) + part->remove(); + + throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED); + } + LOG_TRACE(log, "Cloning part " << move.part->name); move.part->makeCloneOnDiskDetached(move.reserved_space); @@ -151,22 +168,31 @@ MergeTreeData::DataPartsVector MergeTreePartsMover::cloneParts(const MergeTreeMo bool MergeTreePartsMover::swapClonedParts(const MergeTreeData::DataPartsVector & cloned_parts, String * out_reason) { std::vector failed_parts; - for (auto && cloned_part : cloned_parts) + for (size_t i = 0; i < cloned_parts.size(); ++i) { - auto part = data.getActiveContainingPart(cloned_part->name); - if (!part || part->name != cloned_part->name) + if (moves_blocker.isCancelled()) { - LOG_INFO(log, "Failed to swap " << cloned_part->name << ". Active part doesn't exist." + /// Removing all copied parts from disk + for (size_t j = i; j < cloned_parts.size(); ++j) + cloned_parts[j]->remove(); + + throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED); + } + + auto part = data.getActiveContainingPart(cloned_parts[i]->name); + if (!part || part->name != cloned_parts[i]->name) + { + LOG_INFO(log, "Failed to swap " << cloned_parts[i]->name << ". Active part doesn't exist." << " It can be removed by merge or deleted by hand. Will remove copy on path '" - << cloned_part->getFullPath() << "'."); - failed_parts.push_back(cloned_part->name); - cloned_part->remove(); + << cloned_parts[i]->getFullPath() << "'."); + failed_parts.push_back(cloned_parts[i]->name); + cloned_parts[i]->remove(); continue; } - cloned_part->renameTo(part->name); + cloned_parts[i]->renameTo(part->name); - data.swapActivePart(cloned_part); + data.swapActivePart(cloned_parts[i]); } if (!failed_parts.empty()) diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.h b/dbms/src/Storages/MergeTree/MergeTreePartsMover.h index a508344562a..68e7969e67e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.h @@ -1,14 +1,20 @@ #pragma once -#include -#include -#include #include #include +#include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} struct MargeTreeMoveEntry { @@ -24,6 +30,44 @@ struct MargeTreeMoveEntry using MergeTreeMovingParts = std::vector; +struct MovingPartsTagger +{ + MergeTreeMovingParts parts_to_move; + std::unique_lock background_lock; + MergeTreeData::DataParts & all_moving_parts; + + + MovingPartsTagger(MergeTreeMovingParts && moving_parts_, + std::unique_lock && background_lock_, + MergeTreeData::DataParts & all_moving_data_parts_) + : parts_to_move(std::move(moving_parts_)) + , background_lock(std::move(background_lock_)) + , all_moving_parts(all_moving_data_parts_) + { + if (!background_lock) + throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR); + + for (const auto & moving_part : parts_to_move) + if(!all_moving_parts.emplace(moving_part.part).second) + throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR); + + background_lock.unlock(); + } + + ~MovingPartsTagger() + { + background_lock.lock(); + for (const auto & moving_part : parts_to_move) + { + /// Something went completely wrong + if (!all_moving_parts.count(moving_part.part)) + std::terminate(); + all_moving_parts.erase(moving_part.part); + } + + } +}; + class MergeTreePartsMover { using AllowedMovingPredicate = std::function; @@ -42,6 +86,9 @@ public: bool swapClonedParts(const MergeTreeData::DataPartsVector & cloned_parts, String * out_reason); +public: + ActionBlocker moves_blocker; + private: MergeTreeData & data; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 3c530262215..5693a9ec630 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -83,8 +83,6 @@ StorageMergeTree::StorageMergeTree( increment.set(getMaxBlockNumber()); loadMutations(); - - moving_parts_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageMergeTree::movingPartsTask)", [this] { movingPartsTask(); }); } @@ -99,7 +97,6 @@ void StorageMergeTree::startup() /// NOTE background task will also do the above cleanups periodically. time_after_previous_cleanup.restart(); background_task_handle = background_pool.addTask([this] { return backgroundTask(); }); - moving_parts_task->activateAndSchedule(); } @@ -109,8 +106,12 @@ void StorageMergeTree::shutdown() return; shutdown_called = true; merger_mutator.merges_blocker.cancelForever(); + parts_mover.moves_blocker.cancelForever(); + if (background_task_handle) background_pool.removeTask(background_task_handle); + + background_task_handle.reset(); } @@ -317,7 +318,7 @@ void StorageMergeTree::alter( } -/// While exists, marks parts as 'currently_processing_in_background' and reserves free space on filesystem. +/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem. struct CurrentlyMergingPartsTagger { FutureMergedMutatedPart future_part; @@ -339,10 +340,10 @@ public: for (const auto & part : future_part.parts) { - if (storage.currently_processing_in_background.count(part)) + if (storage.currently_merging_mutating_parts.count(part)) throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } - storage.currently_processing_in_background.insert(future_part.parts.begin(), future_part.parts.end()); + storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end()); } ~CurrentlyMergingPartsTagger() @@ -351,9 +352,9 @@ public: for (const auto & part : future_part.parts) { - if (!storage.currently_processing_in_background.count(part)) + if (!storage.currently_merging_mutating_parts.count(part)) std::terminate(); - storage.currently_processing_in_background.erase(part); + storage.currently_merging_mutating_parts.erase(part); } /// Update the information about failed parts in the system.mutations table. @@ -533,7 +534,7 @@ bool StorageMergeTree::merge( auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) { - return !currently_processing_in_background.count(left) && !currently_processing_in_background.count(right) + return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right) && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock); }; @@ -641,9 +642,10 @@ void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & p { auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - MergeTreeMovingParts parts_to_move; + std::optional moving_tagger; { - std::lock_guard background_processing_lock(currently_processing_in_background_mutex); + MergeTreeMovingParts parts_to_move; + std::unique_lock background_processing_lock(currently_processing_in_background_mutex); for (const auto & part : parts) { @@ -658,51 +660,68 @@ void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & p throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - if (currently_processing_in_background.count(part)) + if (currently_merging_mutating_parts.count(part)) throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); parts_to_move.emplace_back(part, std::move(reservation)); - } + + moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts); } std::string reason; - auto cloned_parts = parts_mover.cloneParts(parts_to_move); + auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move); if (!parts_mover.swapClonedParts(cloned_parts, &reason)) throw Exception("Move failed. " + reason, ErrorCodes::LOGICAL_ERROR); } -void StorageMergeTree::movingPartsTask() +bool StorageMergeTree::moveParts() { LOG_INFO(log, "TRYING TO MOVE SMS"); auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - MergeTreeMovingParts parts_to_move; + std::optional moving_tagger; { - std::lock_guard background_processing_lock(currently_processing_in_background_mutex); + MergeTreeMovingParts parts_to_move; + std::unique_lock background_processing_lock(currently_processing_in_background_mutex); - auto can_move = [this](const DataPartPtr & part, String *) -> bool + auto can_move = [this](const DataPartPtr & part, String * reason) -> bool { - return !currently_processing_in_background.count(part); + + if (currently_merging_mutating_parts.count(part)) + { + *reason = "part is already assigned to merge or mutation."; + return false; + } + + if (currently_moving_parts.count(part)) + { + *reason = "part is already moving."; + return false; + } + + return true; }; if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) - { - moving_parts_task->scheduleAfter(1 * 1000); - return; - } - } - LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); + return false; - auto cloned_parts = parts_mover.cloneParts(parts_to_move); + LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); + moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts); + } + + auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move); std::string reason; if (!parts_mover.swapClonedParts(cloned_parts, &reason)) + { LOG_WARNING(log, "Move failed: " << reason); + return false; + } - moving_parts_task->scheduleAfter(1 * 1000); + return true; } @@ -726,7 +745,7 @@ bool StorageMergeTree::tryMutatePart() auto mutations_end_it = current_mutations_by_version.end(); for (const auto & part : getDataPartsVector()) { - if (currently_processing_in_background.count(part)) + if (currently_merging_mutating_parts.count(part)) continue; auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); @@ -828,8 +847,8 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask() if (shutdown_called) return BackgroundProcessingPoolTaskResult::ERROR; - if (merger_mutator.merges_blocker.isCancelled()) - return BackgroundProcessingPoolTaskResult::ERROR; + bool merges_mutations_blocked = merger_mutator.merges_blocker.isCancelled(); + bool moves_blocked = parts_mover.moves_blocker.isCancelled(); try { @@ -846,10 +865,13 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask() } ///TODO: read deduplicate option from table config - if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) + if (!merges_mutations_blocked && merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) return BackgroundProcessingPoolTaskResult::SUCCESS; - if (tryMutatePart()) + if (!moves_blocked && moveParts()) + return BackgroundProcessingPoolTaskResult::SUCCESS; + + if (!merges_mutations_blocked && tryMutatePart()) return BackgroundProcessingPoolTaskResult::SUCCESS; else return BackgroundProcessingPoolTaskResult::ERROR; @@ -1156,52 +1178,6 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts); - //String source_dir = "detached/"; - - //std::map name_to_disk; - - ///// Let's make a list of parts to add. - //Strings parts; - //if (attach_part) - //{ - // parts.push_back(partition_id); - //} - //else - //{ - // LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); - // ActiveDataPartSet active_parts(format_version); - // const auto disks = storage_policy->getDisks(); - // for (const DiskSpace::DiskPtr & disk : disks) - // { - // const auto full_path = getFullPathOnDisk(disk); - // for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) - // { - // const String & name = it.name(); - // MergeTreePartInfo part_info; - // if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version) - // || part_info.partition_id != partition_id) - // { - // continue; - // } - // LOG_DEBUG(log, "Found part " << name); - // active_parts.add(name); - // name_to_disk[name] = disk; - // } - // } - // LOG_DEBUG(log, active_parts.size() << " of them are active"); - // parts = active_parts.getParts(); - //} - - //for (const auto & source_part_name : parts) - //{ - // const auto & source_part_disk = name_to_disk[source_part_name]; - - // LOG_DEBUG(log, "Checking data"); - // MergeTreeData::MutableDataPartPtr part = loadPartAndFixMetadata(source_part_disk, source_dir + source_part_name); - - // LOG_INFO(log, "Attaching part " << source_part_name << " from " << getFullPathOnDisk(source_part_disk)); - // renameTempPartAndAdd(part, &increment); - for (size_t i = 0; i < loaded_parts.size(); ++i) { LOG_INFO(log, "Attaching part " << loaded_parts[i]->name << " from " << renamed_parts.old_and_new_names[i].second); diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index b474c5bb973..a55045c1de9 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -84,19 +84,30 @@ private: /// For clearOldParts, clearOldTemporaryDirectories. AtomicStopwatch time_after_previous_cleanup; + /// Mutex for parts currently processing in background + /// merging (also with TTL), mutating or moving. mutable std::mutex currently_processing_in_background_mutex; - DataParts currently_processing_in_background; + + /// Parts that currently participate in merge or mutation. + /// This set have to be used with `currently_processing_in_background_mutex`. + DataParts currently_merging_mutating_parts; + + /// Parts that currently moving from disk/volume to another. + /// This set have to be used with `currently_processing_in_background_mutex`. + /// Moving may conflict with merges and mutations, but this is OK, because + /// if we decide to move some part to another disk, than we + /// assuredly will choose this disk for containing part, which will appear + /// as result of merge or mutation. + DataParts currently_moving_parts; + std::map current_mutations_by_id; std::multimap current_mutations_by_version; std::atomic shutdown_called {false}; + /// Task handler for merges, mutations and moves. BackgroundProcessingPool::TaskHandle background_task_handle; - /// A task which move parts to another disks/volumes - /// executes in background schedule pool - BackgroundSchedulePool::TaskHolder moving_parts_task; - std::vector prepareAlterTransactions( const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context); @@ -108,7 +119,7 @@ private: */ bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); - void movingPartsTask(); + bool moveParts(); /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. bool tryMutatePart(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index b3014a34dc3..e2abbecc060 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -126,7 +126,6 @@ namespace ActionLocks static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000; static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000; -static const auto MOVE_PARTS_SLEEP_MS = 1 * 1000; /** There are three places for each part, where it should be * 1. In the RAM, data_parts, all_data_parts. @@ -234,9 +233,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( mutations_finalizing_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); }); - moving_parts_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::movingPartsTask)", [this] { movingPartsTask(); }); - moving_parts_task->activateAndSchedule(); - if (global_context.hasZooKeeper()) current_zookeeper = global_context.getZooKeeper(); @@ -2208,34 +2204,62 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS; } -void StorageReplicatedMergeTree::movingPartsTask() +BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movingPartsTask() { auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - MergeTreeMovingParts parts_to_move; - auto can_move = [this](const DataPartPtr & part, String *) -> bool - { - return !queue.isPartAssignedToBackgroundOperation(part); - }; - if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) + try { - moving_parts_task->scheduleAfter(MOVE_PARTS_SLEEP_MS); - return; + std::optional moving_tagger; + { + MergeTreeMovingParts parts_to_move; + std::unique_lock moving_parts_lock(moving_parts_mutex); + + auto can_move = [this](const DataPartPtr & part, String * reason) -> bool + { + if (queue.isPartAssignedToBackgroundOperation(part)) + { + *reason = "part already assigned to replicated background operation."; + return false; + } + if (!currently_moving_parts.count(part)) + { + *reason = "part is already moving."; + return false; + } + + return true; + }; + + if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) + return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; + + LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); + moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts); + } + + auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move); + + std::string reason; + if (!parts_mover.swapClonedParts(cloned_parts, &reason)) + { + LOG_INFO(log, "Move failed. " << reason); + return BackgroundProcessingPoolTaskResult::ERROR; + } + + return BackgroundProcessingPoolTaskResult::SUCCESS; } - - LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move."); - - auto cloned_parts = parts_mover.cloneParts(parts_to_move); - - std::string reason; - if (!parts_mover.swapClonedParts(cloned_parts, &reason)) + catch (const Exception & e) { - LOG_INFO(log, "Move failed. " << reason); - moving_parts_task->scheduleAfter(MOVE_PARTS_SLEEP_MS); + if (e.code() == ErrorCodes::ABORTED) + { + LOG_INFO(log, e.message()); + return BackgroundProcessingPoolTaskResult::ERROR; + } + + throw; } - else - moving_parts_task->schedule(); } @@ -2243,30 +2267,37 @@ void StorageReplicatedMergeTree::movePartsToSpace(const MergeTreeData::DataParts { auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - MergeTreeMovingParts parts_to_move; - for (const auto & part : parts) + + std::optional moving_tagger; { - auto reservation = space->reserve(part->bytes_on_disk); - if (!reservation) - throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE); + MergeTreeMovingParts parts_to_move; + std::unique_lock moving_parts_lock(moving_parts_mutex); + for (const auto & part : parts) + { + auto reservation = space->reserve(part->bytes_on_disk); + if (!reservation) + throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE); - auto & reserved_disk = reservation->getDisk(); - String path_to_clone = getFullPathOnDisk(reserved_disk); + auto & reserved_disk = reservation->getDisk(); + String path_to_clone = getFullPathOnDisk(reserved_disk); - if (Poco::File(path_to_clone + part->name).exists()) - throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.", - ErrorCodes::DIRECTORY_ALREADY_EXISTS); + if (Poco::File(path_to_clone + part->name).exists()) + throw Exception( + "Move is not possible: " + path_to_clone + part->name + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS); - if (queue.isPartAssignedToBackgroundOperation(part)) - throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.", - ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + if (queue.isPartAssignedToBackgroundOperation(part)) + throw Exception( + "Cannot move part '" + part->name + "' because it's participating in background process.", + ErrorCodes::PART_IS_TEMPORARILY_LOCKED); - parts_to_move.emplace_back(part, std::move(reservation)); + parts_to_move.emplace_back(part, std::move(reservation)); + } + moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts); } - std::string reason; - auto cloned_parts = parts_mover.cloneParts(parts_to_move); + auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move); + std::string reason; if (!parts_mover.swapClonedParts(cloned_parts, &reason)) throw Exception("Move failed. " + reason, ErrorCodes::LOGICAL_ERROR); } @@ -2973,6 +3004,7 @@ void StorageReplicatedMergeTree::startup() data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler()); queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); }); + move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movingPartsTask(); } ); /// In this thread replica will be activated. restarting_thread.start(); @@ -2987,6 +3019,7 @@ void StorageReplicatedMergeTree::shutdown() /// Cancel fetches, merges and mutations to force the queue_task to finish ASAP. fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); + parts_mover.moves_blocker.cancelForever(); restarting_thread.shutdown(); @@ -2994,6 +3027,10 @@ void StorageReplicatedMergeTree::shutdown() global_context.getBackgroundPool().removeTask(queue_task_handle); queue_task_handle.reset(); + if (move_parts_task_handle) + global_context.getBackgroundPool().removeTask(move_parts_task_handle); + move_parts_task_handle.reset(); + if (data_parts_exchange_endpoint_holder) { data_parts_exchange_endpoint_holder->getBlocker().cancelForever(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index c7db9f15bef..600e3e93608 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -264,6 +264,8 @@ private: /// A task that performs actions from the queue. BackgroundProcessingPool::TaskHandle queue_task_handle; + /// A task which move parts to another disks/volumes + /// Transparent for replication. BackgroundProcessingPool::TaskHandle move_parts_task_handle; /// A task that selects parts to merge. @@ -274,10 +276,6 @@ private: /// A task that marks finished mutations as done. BackgroundSchedulePool::TaskHolder mutations_finalizing_task; - /// A task which move parts to another disks/volumes - /// Transparent for replication. - BackgroundSchedulePool::TaskHolder moving_parts_task; - /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; @@ -412,9 +410,9 @@ private: */ BackgroundProcessingPoolTaskResult queueTask(); - /// Perform moves of parts to another disks - /// No log entry, because moves are not replicated - void movingPartsTask(); + /// Perform moves of parts to another disks. + /// Local operation, doesn't interact with replicationg queue. + BackgroundProcessingPoolTaskResult movingPartsTask(); /// Postcondition: @@ -473,6 +471,10 @@ private: std::unordered_set currently_fetching_parts; std::mutex currently_fetching_parts_mutex; + /// + DataParts currently_moving_parts; + std::mutex moving_parts_mutex; + /// With the quorum being tracked, add a replica to the quorum for the part. void updateQuorum(const String & part_name);