diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 69ff0e8df7a..9eca8ccc4e9 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -15,7 +15,7 @@ namespace ErrorCodes { extern const int UNEXPECTED_NODE_IN_ZOOKEEPER; extern const int UNFINISHED; - extern const int PART_IS_TEMPORARY_LOCKED; + extern const int PART_IS_TEMPORARILY_LOCKED; } @@ -27,21 +27,39 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & {} -void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts, bool throw_if_already_virtual) +void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts) { std::lock_guard lock(state_mutex); - for (const auto & part : parts) + for (auto part : parts) { - if (throw_if_already_virtual && !virtual_parts.getContainingPart(part->info).empty()) - throw Exception("Part " + part->name + " or covering part is already contains in virtual (future) parts set.", ErrorCodes::PART_IS_TEMPORARY_LOCKED); - current_parts.add(part->name); virtual_parts.add(part->name); } } +void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts) +{ + std::lock_guard lock(state_mutex); + for (const auto & data_part : data_parts) + { + if (!virtual_parts.getContainingPart(data_part->name).empty()) + throw Exception("Part " + data_part->name + " or covering part is" + + " already contains in virtual (future) parts set.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + } + + for (const auto & data_part : data_parts) + virtual_parts.add(data_part->name); +} + + +bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const +{ + std::lock_guard lock(state_mutex); + return !virtual_parts.getContainingPart(data_part->name).empty(); +} + bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) { auto queue_path = replica_path + "/queue"; @@ -1734,24 +1752,6 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge } -ReplicatedMergeTreeMovePredicate::ReplicatedMergeTreeMovePredicate(const ReplicatedMergeTreeQueue & queue_) - : queue(queue_) - , queue_state_lock(queue_.state_mutex) -{ -} - - -bool ReplicatedMergeTreeMovePredicate::operator()(const MergeTreeData::DataPartPtr & part, String * /* out_reason */) const -{ - return !queue.virtual_parts.getContainingPart(part->info).empty(); -} - - -ReplicatedMergeTreeMovePredicate::~ReplicatedMergeTreeMovePredicate() -{ - queue_state_lock.unlock(); -} - ReplicatedMergeTreeQueue::SubscriberHandler ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 307084ad0ac..9d352360e90 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -158,6 +158,8 @@ private: /// Ensures that only one thread is simultaneously updating mutations. std::mutex update_mutations_mutex; + /// Put a set of (already existing) parts in virtual_parts. + void addVirtualParts(const MergeTreeData::DataParts & parts); void insertUnlocked( const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, @@ -228,8 +230,6 @@ public: ~ReplicatedMergeTreeQueue(); - /// Put a set of (already existing) parts in virtual_parts. TODO(MOVE TO PRIVATE) - void addVirtualParts(const MergeTreeData::DataParts & parts, bool throw_if_already_virtual=false); void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_, const MergeTreeData::DataParts & parts); @@ -327,6 +327,15 @@ public: /// Part maybe fake (look at ReplicatedMergeTreeMergePredicate). void disableMergesInBlockRange(const String & part_name); + /// Prohibit merges for specified parts. + /// Add part to virtual_parts, which means that part must exist + /// after processing replication log up to log_pointer. + /// Throws exception if any part was in virtual parts + void disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts); + + /// Cheks that part is already in virtual parts + bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const; + /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts. /// Locks queue's mutex. bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6e1bd73841f..9242d195dbc 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2181,17 +2181,18 @@ public: : parts(std::move(parts_)) , queue(queue_) { - MergeTreeData::DataParts data_parts; + MergeTreeData::DataPartsVector data_parts; /// Assume queue mutex is already locked, because this method is called from tryMoveParts. for (const auto & moving_part : parts) - data_parts.emplace(moving_part.part); + data_parts.emplace_back(moving_part.part); /// Throws exception if some parts already exists - queue.addVirtualParts(data_parts, true); + queue.disableMergesForParts(data_parts); } ~CurrentlyMovingPartsTagger() { + /// May return false, but we don't care, it's ok for (auto & part : parts) queue.removeFromVirtualParts(part.part->info); } @@ -2206,19 +2207,24 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::tryMoveParts() { auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY); - MergeTreeMovingParts parts_to_move; + auto can_move = [this](const DataPartPtr & part, String *) -> bool { - /// Holds lock on queue until selection finished - ReplicatedMergeTreeMovePredicate can_move(queue); + return !queue.isVirtualPart(part); + }; - if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) - { - LOG_INFO(log, "Nothing to move."); - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - } + MergeTreeMovingParts parts_to_move; + if (!parts_mover.selectPartsToMove(parts_to_move, can_move)) + return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; + + try + { + moving_tagger.emplace(std::move(parts_to_move), queue); + } + catch (const DB::Exception & ex) + { + LOG_INFO(log, "Cannot start moving: " + ex.displayText()); + return BackgroundProcessingPoolTaskResult::ERROR; } - - moving_tagger.emplace(std::move(parts_to_move), queue); } LOG_INFO(log, "Found " << moving_tagger->parts.size() << " parts to move."); @@ -2954,6 +2960,10 @@ void StorageReplicatedMergeTree::shutdown() global_context.getBackgroundPool().removeTask(queue_task_handle); queue_task_handle.reset(); + if (move_parts_task_handle) + global_context.getBackgroundPool().removeTask(move_parts_task_handle); + move_parts_task_handle.reset(); + if (data_parts_exchange_endpoint_holder) { data_parts_exchange_endpoint_holder->getBlocker().cancelForever();