Merge pull request #36106 from ClickHouse/get_rid_of_fs_paths_keeper

Get rid of fs paths keeper
This commit is contained in:
alesapin 2022-04-11 12:24:46 +02:00 committed by GitHub
commit 5357cad105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 70 additions and 209 deletions

View File

@ -32,21 +32,6 @@ DiskAzureBlobStorageSettings::DiskAzureBlobStorageSettings(
thread_pool_size(thread_pool_size_) {}
class AzureBlobStoragePathKeeper : public RemoteFSPathKeeper
{
public:
/// RemoteFSPathKeeper constructed with a placeholder argument for chunk_limit, it is unused in this class
AzureBlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {}
void addPath(const String & path) override
{
paths.push_back(path);
}
std::vector<String> paths;
};
DiskAzureBlobStorage::DiskAzureBlobStorage(
const String & name_,
DiskPtr metadata_disk_,
@ -150,36 +135,24 @@ bool DiskAzureBlobStorage::checkUniqueId(const String & id) const
}
void DiskAzureBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
void DiskAzureBlobStorage::removeFromRemoteFS(const std::vector<String> & paths)
{
auto * paths_keeper = dynamic_cast<AzureBlobStoragePathKeeper *>(fs_paths_keeper.get());
if (paths_keeper)
for (const auto & path : paths)
{
for (const auto & path : paths_keeper->paths)
try
{
try
{
auto delete_info = blob_container_client->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
catch (const Azure::Storage::StorageException & e)
{
LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message);
throw;
}
auto delete_info = blob_container_client->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
catch (const Azure::Storage::StorageException & e)
{
LOG_INFO(log, "Caught an error while deleting file {} : {}", path, e.Message);
throw;
}
}
}
RemoteFSPathKeeperPtr DiskAzureBlobStorage::createFSPathKeeper() const
{
return std::make_shared<AzureBlobStoragePathKeeper>();
}
void DiskAzureBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
{
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);

View File

@ -67,9 +67,7 @@ public:
bool checkUniqueId(const String & id) const override;
void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
void removeFromRemoteFS(const std::vector<String> & paths) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override;

View File

@ -30,35 +30,6 @@ namespace ErrorCodes
}
class HDFSPathKeeper : public RemoteFSPathKeeper
{
public:
using Chunk = std::vector<std::string>;
using Chunks = std::list<Chunk>;
explicit HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
void addPath(const String & path) override
{
if (chunks.empty() || chunks.back().size() >= chunk_limit)
{
chunks.push_back(Chunks::value_type());
chunks.back().reserve(chunk_limit);
}
chunks.back().push_back(path.data());
}
void removePaths(Fn<void(Chunk &&)> auto && remove_chunk_func)
{
for (auto & chunk : chunks)
remove_chunk_func(std::move(chunk));
}
private:
Chunks chunks;
};
DiskHDFS::DiskHDFS(
const String & disk_name_,
const String & hdfs_root_path_,
@ -109,30 +80,17 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(create_metadata_callback), hdfs_path);
}
RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const
void DiskHDFS::removeFromRemoteFS(const std::vector<String> & paths)
{
return std::make_shared<HDFSPathKeeper>(settings->objects_chunk_size_to_delete);
}
for (const auto & hdfs_path : paths)
{
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
{
auto * hdfs_paths_keeper = dynamic_cast<HDFSPathKeeper *>(fs_paths_keeper.get());
if (hdfs_paths_keeper)
hdfs_paths_keeper->removePaths([&](std::vector<std::string> && chunk)
{
for (const auto & hdfs_object_path : chunk)
{
const String & hdfs_path = hdfs_object_path;
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
/// Add path from root to file name
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0);
if (res == -1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
}
});
/// Add path from root to file name
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0);
if (res == -1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
}
}
bool DiskHDFS::checkUniqueId(const String & hdfs_uri) const

View File

@ -62,9 +62,7 @@ public:
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode, const WriteSettings & settings) override;
void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
void removeFromRemoteFS(const std::vector<String> & paths) override;
/// Check file exists and ClickHouse has an access to it
/// Overrode in remote disk

View File

@ -270,7 +270,7 @@ std::unordered_map<String, String> IDiskRemote::getSerializedMetadata(const std:
return metadatas;
}
void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadata(const String & path, std::vector<std::string> & paths_to_remove)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
@ -282,13 +282,13 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p
try
{
auto metadata_updater = [fs_paths_keeper, this] (Metadata & metadata)
auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
{
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
paths_to_remove.push_back(remote_fs_root_path + remote_fs_object_path);
if (cache)
{
@ -327,18 +327,18 @@ void IDiskRemote::removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_p
}
void IDiskRemote::removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
void IDiskRemote::removeMetadataRecursive(const String & path, std::vector<String> & paths_to_remove)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
{
removeMetadata(path, fs_paths_keeper);
removeMetadata(path, paths_to_remove);
}
else
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), fs_paths_keeper);
removeMetadataRecursive(it->path(), paths_to_remove);
metadata_disk->removeDirectory(path);
}
@ -480,50 +480,47 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
moveFile(from_path, to_path);
}
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetadata(path, fs_paths_keeper);
std::vector<String> paths_to_remove;
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
removeFromRemoteFS(paths_to_remove);
}
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
std::vector<String> paths_to_remove;
if (metadata_disk->exists(path))
{
removeMetadata(path, fs_paths_keeper);
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
removeFromRemoteFS(paths_to_remove);
}
}
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
std::vector<String> paths_to_remove;
for (const auto & file : files)
{
bool skip = file.if_exists && !metadata_disk->exists(file.path);
if (!skip)
removeMetadata(file.path, fs_paths_keeper);
removeMetadata(file.path, paths_to_remove);
}
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
removeFromRemoteFS(paths_to_remove);
}
void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetadataRecursive(path, fs_paths_keeper);
std::vector<String> paths_to_remove;
removeMetadataRecursive(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(fs_paths_keeper);
removeFromRemoteFS(paths_to_remove);
}

View File

@ -39,24 +39,6 @@ struct BlobPathWithSize
/// List of blobs with their sizes
using BlobsPathToSize = std::vector<BlobPathWithSize>;
/// Helper class to collect paths into chunks of maximum size.
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
class RemoteFSPathKeeper
{
public:
explicit RemoteFSPathKeeper(size_t chunk_limit_) : chunk_limit(chunk_limit_) {}
virtual ~RemoteFSPathKeeper() = default;
virtual void addPath(const String & path) = 0;
protected:
size_t chunk_limit;
};
using RemoteFSPathKeeperPtr = std::shared_ptr<RemoteFSPathKeeper>;
class IAsynchronousReader;
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
@ -165,9 +147,7 @@ public:
bool checkUniqueId(const String & id) const override = 0;
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0;
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
virtual void removeFromRemoteFS(const std::vector<String> & paths) = 0;
static AsynchronousReaderPtr getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
@ -190,9 +170,9 @@ protected:
FileCachePtr cache;
private:
void removeMetadata(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadata(const String & path, std::vector<String> & paths_to_remove);
void removeMetadataRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetadataRecursive(const String & path, std::vector<String> & paths_to_remove);
bool tryReserve(UInt64 bytes);

View File

@ -60,52 +60,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
class S3PathKeeper : public RemoteFSPathKeeper
{
public:
using Chunk = Aws::Vector<Aws::S3::Model::ObjectIdentifier>;
using Chunks = std::list<Chunk>;
explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
void addPath(const String & path) override
{
if (chunks.empty() || chunks.back().size() >= chunk_limit)
{
/// add one more chunk
chunks.push_back(Chunks::value_type());
chunks.back().reserve(chunk_limit);
}
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
chunks.back().push_back(obj);
}
void removePaths(Fn<void(Chunk &&)> auto && remove_chunk_func)
{
for (auto & chunk : chunks)
remove_chunk_func(std::move(chunk));
}
static String getChunkKeys(const Chunk & chunk)
{
String res;
for (const auto & obj : chunk)
{
const auto & key = obj.GetKey();
if (!res.empty())
res.append(", ");
res.append(key.c_str(), key.size());
}
return res;
}
private:
Chunks chunks;
};
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{
@ -168,31 +122,36 @@ DiskS3::DiskS3(
{
}
RemoteFSPathKeeperPtr DiskS3::createFSPathKeeper() const
void DiskS3::removeFromRemoteFS(const std::vector<String> & paths)
{
auto settings = current_settings.get();
return std::make_shared<S3PathKeeper>(settings->objects_chunk_size_to_delete);
}
void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
{
auto settings = current_settings.get();
auto * s3_paths_keeper = dynamic_cast<S3PathKeeper *>(fs_paths_keeper.get());
if (s3_paths_keeper)
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
size_t chunk_size_limit = settings->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
String keys;
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
{
String keys = S3PathKeeper::getChunkKeys(chunk);
LOG_TRACE(log, "Remove AWS keys {}", keys);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = settings->client->DeleteObjects(request);
// Do not throw here, continue deleting other chunks
logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;});
});
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position]);
current_chunk.push_back(obj);
if (!keys.empty())
keys += ", ";
keys += paths[current_position];
}
LOG_TRACE(log, "Remove AWS keys {}", keys);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = settings->client->DeleteObjects(request);
logIfError(outcome, [&](){return "Can't remove AWS keys: " + keys;});
}
}
void DiskS3::moveFile(const String & from_path, const String & to_path)

View File

@ -91,9 +91,7 @@ public:
WriteMode mode,
const WriteSettings & settings) override;
void removeFromRemoteFS(RemoteFSPathKeeperPtr keeper) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
void removeFromRemoteFS(const std::vector<String> & paths) override;
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
void moveFile(const String & from_path, const String & to_path) override;