diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 703cf32f743..ebc75d63b75 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1097,6 +1097,21 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_ } +void IMergeTreeDataPart::removeIfNotLockedInS3() const +{ + try + { + /// TODO Unlocking in try-catch looks ugly. Special "keep_s3" flag + /// which is a bit different from "keep_s3_on_delete" flag looks ugly too. + bool keep_s3 = !storage.unlockSharedData(*this); + remove(keep_s3); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, "There is a problem with deleting part " + name + " from filesystem"); + } +} + void IMergeTreeDataPart::remove(bool keep_s3) const { if (!isStoredOnDisk()) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 53640b41507..59c19349e8f 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -127,6 +127,7 @@ public: void assertOnDisk() const; void remove(bool keep_s3 = false) const; + void removeIfNotLockedInS3() const; void projectionRemove(const String & parent_to, bool keep_s3 = false) const; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 6eeabd9604d..628e52f079d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1313,7 +1313,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re CurrentThread::attachTo(thread_group); LOG_DEBUG(log, "Removing part from filesystem {}", part->name); - part->remove(); + part->removeIfNotLockedInS3(); }); } @@ -1324,7 +1324,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re for (const DataPartPtr & part : parts_to_remove) { LOG_DEBUG(log, "Removing part from filesystem {}", part->name); - part->remove(); + part->removeIfNotLockedInS3(); } } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ea81f64659f..1e5c487e424 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5939,57 +5939,6 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio return CancellationCode::CancelSent; } -void StorageReplicatedMergeTree::removePartsFromFilesystem(const DataPartsVector & parts) -{ - auto remove_part = [&](const auto & part) - { - LOG_DEBUG(log, "Removing part from filesystem {}", part.name); - try - { - bool keep_s3 = !this->unlockSharedData(part); - part.remove(keep_s3); - } - catch (...) - { - tryLogCurrentException(log, "There is a problem with deleting part " + part.name + " from filesystem"); - } - }; - - const auto settings = getSettings(); - if (settings->max_part_removal_threads > 1 && parts.size() > settings->concurrent_part_removal_threshold) - { - /// Parallel parts removal. - - size_t num_threads = std::min(settings->max_part_removal_threads, parts.size()); - ThreadPool pool(num_threads); - - /// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool. - for (const DataPartPtr & part : parts) - { - pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()] - { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachQueryIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachTo(thread_group); - - remove_part(*part); - }); - } - - pool.wait(); - } - else - { - for (const DataPartPtr & part : parts) - { - remove_part(*part); - } - } -} - void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() { auto table_lock = lockForShare( @@ -6017,7 +5966,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() /// Delete duplicate parts from filesystem if (!parts_to_delete_only_from_filesystem.empty()) { - removePartsFromFilesystem(parts_to_delete_only_from_filesystem); + clearPartsFromFilesystem(parts_to_delete_only_from_filesystem); removePartsFinally(parts_to_delete_only_from_filesystem); LOG_DEBUG(log, "Removed {} old duplicate parts", parts_to_delete_only_from_filesystem.size()); @@ -6062,7 +6011,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() /// Remove parts from filesystem and finally from data_parts if (!parts_to_remove_from_filesystem.empty()) { - removePartsFromFilesystem(parts_to_remove_from_filesystem); + clearPartsFromFilesystem(parts_to_remove_from_filesystem); removePartsFinally(parts_to_remove_from_filesystem); LOG_DEBUG(log, "Removed {} old parts", parts_to_remove_from_filesystem.size()); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 8ffb4974cb3..2ae19387ee8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -439,8 +439,6 @@ private: /// Just removes part from ZooKeeper using previous method void removePartFromZooKeeper(const String & part_name); - void removePartsFromFilesystem(const DataPartsVector & parts); - /// Quickly removes big set of parts from ZooKeeper (using async multi queries) void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, NameSet * parts_should_be_retried = nullptr);