diff --git a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp index 78b9b9e3446..556c28bd3f4 100644 --- a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp +++ b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.cpp @@ -32,21 +32,6 @@ DiskAzureBlobStorageSettings::DiskAzureBlobStorageSettings( thread_pool_size(thread_pool_size_) {} -class AzureBlobStoragePathKeeper : public RemoteFSPathKeeper -{ -public: - /// RemoteFSPathKeeper constructed with a placeholder argument for chunk_limit, it is unused in this class - AzureBlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {} - - void addPath(const String & path) override - { - paths.push_back(path); - } - - std::vector paths; -}; - - DiskAzureBlobStorage::DiskAzureBlobStorage( const String & name_, DiskPtr metadata_disk_, @@ -150,36 +135,24 @@ bool DiskAzureBlobStorage::checkUniqueId(const String & id) const } -void DiskAzureBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) +void DiskAzureBlobStorage::removeFromRemoteFS(const std::vector & paths) { - auto * paths_keeper = dynamic_cast(fs_paths_keeper.get()); - - if (paths_keeper) + for (const auto & path : paths) { - for (const auto & path : paths_keeper->paths) + try { - try - { - auto delete_info = blob_container_client->DeleteBlob(path); - if (!delete_info.Value.Deleted) - throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path); - } - catch (const Azure::Storage::StorageException & e) - { - LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message); - throw; - } + auto delete_info = blob_container_client->DeleteBlob(path); + if (!delete_info.Value.Deleted) + throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path); + } + catch (const Azure::Storage::StorageException & e) + { + LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message); + throw; } } } - -RemoteFSPathKeeperPtr DiskAzureBlobStorage::createFSPathKeeper() const -{ - return std::make_shared(); -} - - void DiskAzureBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) { auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context); diff --git a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.h b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.h index efc245e7eb3..ff99e246d31 100644 --- a/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.h +++ b/src/Disks/AzureBlobStorage/DiskAzureBlobStorage.h @@ -67,9 +67,7 @@ public: bool checkUniqueId(const String & id) const override; - void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override; - - RemoteFSPathKeeperPtr createFSPathKeeper() const override; + void removeFromRemoteFS(const std::vector & paths) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override; diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index a3817a85a36..a0b96aba728 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -30,35 +30,6 @@ namespace ErrorCodes } -class HDFSPathKeeper : public RemoteFSPathKeeper -{ -public: - using Chunk = std::vector; - using Chunks = std::list; - - explicit HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} - - void addPath(const String & path) override - { - if (chunks.empty() || chunks.back().size() >= chunk_limit) - { - chunks.push_back(Chunks::value_type()); - chunks.back().reserve(chunk_limit); - } - chunks.back().push_back(path.data()); - } - - void removePaths(Fn auto && remove_chunk_func) - { - for (auto & chunk : chunks) - remove_chunk_func(std::move(chunk)); - } - -private: - Chunks chunks; -}; - - DiskHDFS::DiskHDFS( const String & disk_name_, const String & hdfs_root_path_, @@ -109,30 +80,17 @@ std::unique_ptr DiskHDFS::writeFile(const String & path return std::make_unique(std::move(hdfs_buffer), std::move(create_metadata_callback), hdfs_path); } - -RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const +void DiskHDFS::removeFromRemoteFS(const std::vector & paths) { - return std::make_shared(settings->objects_chunk_size_to_delete); -} + for (const auto & hdfs_path : paths) + { + const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2); - -void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) -{ - auto * hdfs_paths_keeper = dynamic_cast(fs_paths_keeper.get()); - if (hdfs_paths_keeper) - hdfs_paths_keeper->removePaths([&](std::vector && chunk) - { - for (const auto & hdfs_object_path : chunk) - { - const String & hdfs_path = hdfs_object_path; - const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2); - - /// Add path from root to file name - int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0); - if (res == -1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); - } - }); + /// Add path from root to file name + int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0); + if (res == -1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); + } } bool DiskHDFS::checkUniqueId(const String & hdfs_uri) const diff --git a/src/Disks/HDFS/DiskHDFS.h b/src/Disks/HDFS/DiskHDFS.h index eba58101bc4..5c6e011dc96 100644 --- a/src/Disks/HDFS/DiskHDFS.h +++ b/src/Disks/HDFS/DiskHDFS.h @@ -62,9 +62,7 @@ public: std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings) override; - void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override; - - RemoteFSPathKeeperPtr createFSPathKeeper() const override; + void removeFromRemoteFS(const std::vector & paths) override; /// Check file exists and ClickHouse has an access to it /// Overrode in remote disk diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index b475ae1ee94..ead951084ad 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -270,7 +270,7 @@ std::unordered_map IDiskRemote::getSerializedMetadata(const std: return metadatas; } -void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper) +void IDiskRemote::removeMetadata(const String & path, std::vector & paths_to_remove) { LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path)); @@ -282,13 +282,13 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p try { - auto metadata_updater = [fs_paths_keeper, this] (Metadata & metadata) + auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata) { if (metadata.ref_count == 0) { for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects) { - fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path); + paths_to_remove.push_back(remote_fs_root_path + remote_fs_object_path); if (cache) { @@ -327,18 +327,18 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p } -void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper) +void IDiskRemote::removeMetadataRecursive(const String & path, std::vector & paths_to_remove) { checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. if (metadata_disk->isFile(path)) { - removeMetadata(path, fs_paths_keeper); + removeMetadata(path, paths_to_remove); } else { for (auto it = iterateDirectory(path); it->isValid(); it->next()) - removeMetadataRecursive(it->path(), fs_paths_keeper); + removeMetadataRecursive(it->path(), paths_to_remove); metadata_disk->removeDirectory(path); } @@ -480,50 +480,47 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path) moveFile(from_path, to_path); } - void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only) { - RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); - removeMetadata(path, fs_paths_keeper); + std::vector paths_to_remove; + removeMetadata(path, paths_to_remove); if (!delete_metadata_only) - removeFromRemoteFS(fs_paths_keeper); + removeFromRemoteFS(paths_to_remove); } - void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only) { - RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); - + std::vector paths_to_remove; if (metadata_disk->exists(path)) { - removeMetadata(path, fs_paths_keeper); + removeMetadata(path, paths_to_remove); if (!delete_metadata_only) - removeFromRemoteFS(fs_paths_keeper); + removeFromRemoteFS(paths_to_remove); } } void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only) { - RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + std::vector paths_to_remove; for (const auto & file : files) { bool skip = file.if_exists && !metadata_disk->exists(file.path); if (!skip) - removeMetadata(file.path, fs_paths_keeper); + removeMetadata(file.path, paths_to_remove); } if (!delete_metadata_only) - removeFromRemoteFS(fs_paths_keeper); + removeFromRemoteFS(paths_to_remove); } void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only) { - RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); - removeMetadataRecursive(path, fs_paths_keeper); + std::vector paths_to_remove; + removeMetadataRecursive(path, paths_to_remove); if (!delete_metadata_only) - removeFromRemoteFS(fs_paths_keeper); + removeFromRemoteFS(paths_to_remove); } diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index aa78468c7bb..ac2b1634d05 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -39,24 +39,6 @@ struct BlobPathWithSize /// List of blobs with their sizes using BlobsPathToSize = std::vector; -/// Helper class to collect paths into chunks of maximum size. -/// For s3 it is Aws::vector, for hdfs it is std::vector. -class RemoteFSPathKeeper -{ -public: - explicit RemoteFSPathKeeper(size_t chunk_limit_) : chunk_limit(chunk_limit_) {} - - virtual ~RemoteFSPathKeeper() = default; - - virtual void addPath(const String & path) = 0; - -protected: - size_t chunk_limit; -}; - -using RemoteFSPathKeeperPtr = std::shared_ptr; - - class IAsynchronousReader; using AsynchronousReaderPtr = std::shared_ptr; @@ -165,9 +147,7 @@ public: bool checkUniqueId(const String & id) const override = 0; - virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0; - - virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0; + virtual void removeFromRemoteFS(const std::vector & paths) = 0; static AsynchronousReaderPtr getThreadPoolReader(); static ThreadPool & getThreadPoolWriter(); @@ -190,9 +170,9 @@ protected: FileCachePtr cache; private: - void removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper); + void removeMetadata(const String & path, std::vector & paths_to_remove); - void removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper); + void removeMetadataRecursive(const String & path, std::vector & paths_to_remove); bool tryReserve(UInt64 bytes); diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index b6171a41dfb..6ed53bcb9c2 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -60,52 +60,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API) -/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html -class S3PathKeeper : public RemoteFSPathKeeper -{ -public: - using Chunk = Aws::Vector; - using Chunks = std::list; - - explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} - - void addPath(const String & path) override - { - if (chunks.empty() || chunks.back().size() >= chunk_limit) - { - /// add one more chunk - chunks.push_back(Chunks::value_type()); - chunks.back().reserve(chunk_limit); - } - Aws::S3::Model::ObjectIdentifier obj; - obj.SetKey(path); - chunks.back().push_back(obj); - } - - void removePaths(Fn auto && remove_chunk_func) - { - for (auto & chunk : chunks) - remove_chunk_func(std::move(chunk)); - } - - static String getChunkKeys(const Chunk & chunk) - { - String res; - for (const auto & obj : chunk) - { - const auto & key = obj.GetKey(); - if (!res.empty()) - res.append(", "); - res.append(key.c_str(), key.size()); - } - return res; - } - -private: - Chunks chunks; -}; - template void throwIfError(Aws::Utils::Outcome & response) { @@ -168,31 +122,36 @@ DiskS3::DiskS3( { } -RemoteFSPathKeeperPtr DiskS3::createFSPathKeeper() const +void DiskS3::removeFromRemoteFS(const std::vector & paths) { auto settings = current_settings.get(); - return std::make_shared(settings->objects_chunk_size_to_delete); -} -void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) -{ - auto settings = current_settings.get(); - auto * s3_paths_keeper = dynamic_cast(fs_paths_keeper.get()); - - if (s3_paths_keeper) - s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk) + size_t chunk_size_limit = settings->objects_chunk_size_to_delete; + size_t current_position = 0; + while (current_position < paths.size()) + { + std::vector current_chunk; + String keys; + for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position) { - String keys = S3PathKeeper::getChunkKeys(chunk); - LOG_TRACE(log, "Remove AWS keys {}", keys); - Aws::S3::Model::Delete delkeys; - delkeys.SetObjects(chunk); - Aws::S3::Model::DeleteObjectsRequest request; - request.SetBucket(bucket); - request.SetDelete(delkeys); - auto outcome = settings->client->DeleteObjects(request); - // Do not throw here, continue deleting other chunks - logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;}); - }); + Aws::S3::Model::ObjectIdentifier obj; + obj.SetKey(paths[current_position]); + current_chunk.push_back(obj); + + if (!keys.empty()) + keys += ", "; + keys += paths[current_position]; + } + + LOG_TRACE(log, "Remove AWS keys {}", keys); + Aws::S3::Model::Delete delkeys; + delkeys.SetObjects(current_chunk); + Aws::S3::Model::DeleteObjectsRequest request; + request.SetBucket(bucket); + request.SetDelete(delkeys); + auto outcome = settings->client->DeleteObjects(request); + logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;}); + } } void DiskS3::moveFile(const String & from_path, const String & to_path) diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index 32eb9ee7aef..80784362769 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -91,9 +91,7 @@ public: WriteMode mode, const WriteSettings & settings) override; - void removeFromRemoteFS(RemoteFSPathKeeperPtr keeper) override; - - RemoteFSPathKeeperPtr createFSPathKeeper() const override; + void removeFromRemoteFS(const std::vector & paths) override; void moveFile(const String & from_path, const String & to_path, bool send_metadata); void moveFile(const String & from_path, const String & to_path) override;