mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Better
This commit is contained in:
parent
ba41d01b78
commit
5196075fd9
@ -26,14 +26,14 @@ namespace ErrorCodes
|
||||
DiskHDFS::DiskHDFS(
|
||||
const String & disk_name_,
|
||||
const String & hdfs_root_path_,
|
||||
SettingsPtr settings_,
|
||||
const String & metadata_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
size_t min_bytes_for_seek_)
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS")
|
||||
const Poco::Util::AbstractConfiguration & config_)
|
||||
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskS3")
|
||||
, config(config_)
|
||||
, min_bytes_for_seek(min_bytes_for_seek_)
|
||||
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
|
||||
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
|
||||
, settings(std::move(settings_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
|
||||
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
|
||||
|
||||
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, buf_size);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
|
||||
}
|
||||
|
||||
|
||||
@ -91,120 +91,34 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeFromRemoteFS(const Metadata & metadata)
|
||||
void DiskHDFS::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)
|
||||
{
|
||||
for (const auto & [hdfs_object_path, _] : metadata.remote_fs_objects)
|
||||
if (!fs_paths_keeper.empty())
|
||||
{
|
||||
/// Add path from root to file name
|
||||
const size_t begin_of_path = remote_fs_root_path.find('/', remote_fs_root_path.find("//") + 2);
|
||||
const String hdfs_path = remote_fs_root_path.substr(begin_of_path) + hdfs_object_path;
|
||||
|
||||
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.c_str(), 0);
|
||||
if (res == -1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeSharedFile(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
removeMeta(path, keep_in_remote_fs);
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
removeMetaRecursive(path, keep_in_remote_fs);
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeFileIfExists(const String & path)
|
||||
{
|
||||
if (Poco::File(metadata_path + path).exists())
|
||||
removeMeta(path, /* keep_in_remote_fs */ false);
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeRecursive(const String & path)
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
removeFile(path);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeRecursive(it->path());
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeMeta(const String & path, bool keep_in_remote_fs)
|
||||
{
|
||||
Poco::File file(metadata_path + path);
|
||||
|
||||
if (!file.isFile())
|
||||
throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path);
|
||||
|
||||
try
|
||||
{
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
/// If there is no references - delete content from remote FS.
|
||||
if (metadata.ref_count == 0)
|
||||
for (const auto & chunk : fs_paths_keeper)
|
||||
{
|
||||
file.remove();
|
||||
for (const auto & hdfs_object_path : chunk)
|
||||
{
|
||||
const String hdfs_path = hdfs_object_path.GetKey();
|
||||
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
|
||||
|
||||
if (!keep_in_remote_fs)
|
||||
removeFromRemoteFS(metadata);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
--metadata.ref_count;
|
||||
metadata.save();
|
||||
file.remove();
|
||||
/// Add path from root to file name
|
||||
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0);
|
||||
if (res == -1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// If it's impossible to read meta - just remove it from FS.
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
||||
{
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
|
||||
backQuote(path),
|
||||
e.nested() ? e.nested()->message() : e.message());
|
||||
|
||||
file.remove();
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DiskHDFS::removeMetaRecursive(const String & path, bool keep_in_remote_fs)
|
||||
namespace
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
removeMeta(path, keep_in_remote_fs);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeMetaRecursive(it->path(), keep_in_remote_fs);
|
||||
file.remove();
|
||||
}
|
||||
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
return std::make_unique<DiskHDFSSettings>(config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void registerDiskHDFS(DiskFactory & factory)
|
||||
{
|
||||
@ -224,8 +138,9 @@ void registerDiskHDFS(DiskFactory & factory)
|
||||
String metadata_path = context_->getPath() + "disks/" + name + "/";
|
||||
|
||||
return std::make_shared<DiskHDFS>(
|
||||
name, uri, metadata_path, config,
|
||||
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
|
||||
name, uri,
|
||||
getSettings(config, config_prefix),
|
||||
metadata_path, config);
|
||||
};
|
||||
|
||||
factory.registerDiskType("hdfs", creator);
|
||||
|
@ -9,24 +9,31 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct DiskHDFSSettings
|
||||
{
|
||||
size_t min_bytes_for_seek;
|
||||
DiskHDFSSettings(int min_bytes_for_seek_) : min_bytes_for_seek(min_bytes_for_seek_) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Storage for persisting data in HDFS and metadata on the local disk.
|
||||
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
|
||||
* that contains HDFS object key with actual data.
|
||||
*/
|
||||
class DiskHDFS : public IDiskRemote
|
||||
class DiskHDFS final : public IDiskRemote
|
||||
{
|
||||
|
||||
friend class DiskHDFSReservation;
|
||||
|
||||
public:
|
||||
using SettingsPtr = std::unique_ptr<DiskHDFSSettings>;
|
||||
|
||||
DiskHDFS(
|
||||
const String & disk_name_,
|
||||
const String & hdfs_root_path_,
|
||||
SettingsPtr settings_,
|
||||
const String & metadata_path_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
size_t min_bytes_for_seek_);
|
||||
const Poco::Util::AbstractConfiguration & config_);
|
||||
|
||||
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
|
||||
|
||||
@ -40,32 +47,17 @@ public:
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
|
||||
|
||||
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void removeSharedRecursive(const String & path, bool) override;
|
||||
|
||||
void removeFileIfExists(const String & path) override;
|
||||
|
||||
void removeRecursive(const String & path) override;
|
||||
|
||||
void removeFile(const String & path) override { removeSharedFile(path, false); }
|
||||
void removeFromRemoteFS(const RemoteFSPathKeeper & keys) override;
|
||||
|
||||
private:
|
||||
void removeFromRemoteFS(const Metadata & metadata);
|
||||
|
||||
void removeMetaRecursive(const String & path, bool keep_in_remote_fs);
|
||||
|
||||
void removeMeta(const String & path, bool keep_in_remote_fs);
|
||||
|
||||
String getRandomName() { return toString(UUIDHelpers::generateV4()); }
|
||||
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
size_t min_bytes_for_seek;
|
||||
|
||||
HDFSBuilderWrapper hdfs_builder;
|
||||
HDFSFSPtr hdfs_fs;
|
||||
|
||||
std::mutex copying_mutex;
|
||||
SettingsPtr settings;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
@ -22,6 +23,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_FORMAT;
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int PATH_ACCESS_DENIED;;
|
||||
extern const int CANNOT_DELETE_DIRECTORY;
|
||||
}
|
||||
|
||||
|
||||
@ -173,6 +175,66 @@ IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_keeper)
|
||||
{
|
||||
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
|
||||
if (!file.isFile())
|
||||
throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path);
|
||||
|
||||
try
|
||||
{
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
/// If there is no references - delete content from remote FS.
|
||||
if (metadata.ref_count == 0)
|
||||
{
|
||||
file.remove();
|
||||
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
|
||||
fs_paths_keeper.addPath(remote_fs_root_path + remote_fs_object_path);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
--metadata.ref_count;
|
||||
metadata.save();
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// If it's impossible to read meta - just remove it from FS.
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
||||
{
|
||||
LOG_WARNING(log,
|
||||
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
|
||||
backQuote(path), e.nested() ? e.nested()->message() : e.message());
|
||||
file.remove();
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeper & fs_paths_keeper)
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeMetaRecursive(it->path(), fs_paths_keeper);
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
|
||||
DiskPtr DiskRemoteReservation::getDisk(size_t i) const
|
||||
{
|
||||
if (i != 0)
|
||||
@ -224,10 +286,10 @@ IDiskRemote::IDiskRemote(
|
||||
const String & log_name_,
|
||||
std::unique_ptr<Executor> executor_)
|
||||
: IDisk(std::move(executor_))
|
||||
, log(&Poco::Logger::get(log_name_))
|
||||
, name(name_)
|
||||
, remote_fs_root_path(remote_fs_root_path_)
|
||||
, metadata_path(metadata_path_)
|
||||
, log(&Poco::Logger::get(log_name_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -282,6 +344,35 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeFileIfExists(const String & path)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
if (Poco::File(metadata_path + path).exists())
|
||||
{
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeSharedFile(const String & path, bool keep_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
removeMeta(path, fs_paths_keeper);
|
||||
if (!keep_remote_fs)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_remote_fs)
|
||||
{
|
||||
RemoteFSPathKeeper fs_paths_keeper;
|
||||
removeMetaRecursive(path, fs_paths_keeper);
|
||||
if (!keep_remote_fs)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
|
||||
|
||||
void IDiskRemote::setReadOnly(const String & path)
|
||||
{
|
||||
/// We should store read only flag inside metadata file (instead of using FS flag),
|
||||
@ -375,7 +466,7 @@ bool IDiskRemote::tryReserve(UInt64 bytes)
|
||||
std::lock_guard lock(reservation_mutex);
|
||||
if (bytes == 0)
|
||||
{
|
||||
LOG_DEBUG(log, "Reserving 0 bytes on s3 disk {}", backQuote(name));
|
||||
LOG_DEBUG(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name));
|
||||
++reservation_count;
|
||||
return true;
|
||||
}
|
||||
|
@ -4,11 +4,40 @@
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include "Disks/Executor.h"
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <utility>
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
|
||||
class RemoteFSPathKeeper : public std::list<Aws::Vector<Aws::S3::Model::ObjectIdentifier>>
|
||||
{
|
||||
public:
|
||||
void addPath(const String & path)
|
||||
{
|
||||
if (empty() || back().size() >= chunk_limit)
|
||||
{ /// add one more chunk
|
||||
push_back(value_type());
|
||||
back().reserve(chunk_limit);
|
||||
}
|
||||
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(path);
|
||||
back().push_back(obj);
|
||||
}
|
||||
|
||||
private:
|
||||
/// limit for one DeleteObject request
|
||||
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
const static size_t chunk_limit = 1000;
|
||||
};
|
||||
|
||||
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
|
||||
class IDiskRemote : public IDisk
|
||||
@ -26,9 +55,9 @@ public:
|
||||
|
||||
struct Metadata;
|
||||
|
||||
const String & getName() const override { return name; }
|
||||
const String & getName() const final override { return name; }
|
||||
|
||||
const String & getPath() const override { return metadata_path; }
|
||||
const String & getPath() const final override { return metadata_path; }
|
||||
|
||||
Metadata readMeta(const String & path) const;
|
||||
|
||||
@ -56,6 +85,16 @@ public:
|
||||
|
||||
void replaceFile(const String & from_path, const String & to_path) override;
|
||||
|
||||
void removeFile(const String & path) override { removeSharedFile(path, false); }
|
||||
|
||||
void removeFileIfExists(const String & path) override;
|
||||
|
||||
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
|
||||
|
||||
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
|
||||
|
||||
void listFiles(const String & path, std::vector<String> & file_names) override;
|
||||
|
||||
void setReadOnly(const String & path) override;
|
||||
@ -82,13 +121,20 @@ public:
|
||||
|
||||
ReservationPtr reserve(UInt64 bytes) override;
|
||||
|
||||
virtual void removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) = 0;
|
||||
|
||||
protected:
|
||||
Poco::Logger * log;
|
||||
const String name;
|
||||
const String remote_fs_root_path;
|
||||
|
||||
const String metadata_path;
|
||||
Poco::Logger * log;
|
||||
|
||||
private:
|
||||
void removeMeta(const String & path, RemoteFSPathKeeper & keys);
|
||||
|
||||
void removeMetaRecursive(const String & path, RemoteFSPathKeeper & keys);
|
||||
|
||||
bool tryReserve(UInt64 bytes);
|
||||
|
||||
UInt64 reserved_bytes = 0;
|
||||
|
@ -16,18 +16,12 @@
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
@ -41,36 +35,10 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int UNKNOWN_FORMAT;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int CANNOT_DELETE_DIRECTORY;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
|
||||
class DiskS3::AwsS3KeyKeeper : public std::list<Aws::Vector<Aws::S3::Model::ObjectIdentifier>>
|
||||
{
|
||||
public:
|
||||
void addKey(const String & key);
|
||||
|
||||
private:
|
||||
/// limit for one DeleteObject request
|
||||
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
const static size_t chunk_limit = 1000;
|
||||
};
|
||||
|
||||
void DiskS3::AwsS3KeyKeeper::addKey(const String & key)
|
||||
{
|
||||
if (empty() || back().size() >= chunk_limit)
|
||||
{ /// add one more chunk
|
||||
push_back(value_type());
|
||||
back().reserve(chunk_limit);
|
||||
}
|
||||
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(key);
|
||||
back().push_back(obj);
|
||||
}
|
||||
|
||||
String getRandomName()
|
||||
{
|
||||
std::uniform_int_distribution<int> distribution('a', 'z');
|
||||
@ -400,76 +368,13 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
return std::make_unique<WriteIndirectBufferFromS3>(std::move(s3_buffer), std::move(metadata), s3_path);
|
||||
}
|
||||
|
||||
void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
void DiskS3::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)
|
||||
{
|
||||
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
|
||||
if (!file.isFile())
|
||||
throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path);
|
||||
|
||||
try
|
||||
{
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
/// If there is no references - delete content from S3.
|
||||
if (metadata.ref_count == 0)
|
||||
{
|
||||
file.remove();
|
||||
|
||||
for (const auto & [s3_object_path, _] : metadata.remote_fs_objects)
|
||||
keys.addKey(remote_fs_root_path + s3_object_path);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
--metadata.ref_count;
|
||||
metadata.save();
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// If it's impossible to read meta - just remove it from FS.
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
||||
{
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
|
||||
backQuote(path),
|
||||
e.nested() ? e.nested()->message() : e.message());
|
||||
|
||||
file.remove();
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys)
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
removeMeta(path, keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeMetaRecursive(it->path(), keys);
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
|
||||
{
|
||||
if (!keys.empty())
|
||||
if (!fs_paths_keeper.empty())
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
|
||||
for (const auto & chunk : keys)
|
||||
for (const auto & chunk : fs_paths_keeper)
|
||||
{
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(chunk);
|
||||
@ -484,36 +389,9 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeFileIfExists(const String & path)
|
||||
{
|
||||
AwsS3KeyKeeper keys;
|
||||
if (Poco::File(metadata_path + path).exists())
|
||||
{
|
||||
removeMeta(path, keys);
|
||||
removeAws(keys);
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeSharedFile(const String & path, bool keep_s3)
|
||||
{
|
||||
AwsS3KeyKeeper keys;
|
||||
removeMeta(path, keys);
|
||||
if (!keep_s3)
|
||||
removeAws(keys);
|
||||
}
|
||||
|
||||
void DiskS3::removeSharedRecursive(const String & path, bool keep_s3)
|
||||
{
|
||||
AwsS3KeyKeeper keys;
|
||||
removeMetaRecursive(path, keys);
|
||||
if (!keep_s3)
|
||||
removeAws(keys);
|
||||
}
|
||||
|
||||
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
|
||||
createHardLink(src_path, dst_path, settings->send_metadata);
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include "Disks/Executor.h"
|
||||
|
||||
@ -41,22 +40,22 @@ struct DiskS3Settings
|
||||
int list_object_keys_size;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Storage for persisting data in S3 and metadata on the local disk.
|
||||
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
|
||||
* that contains S3 object key with actual data.
|
||||
*/
|
||||
class DiskS3 : public IDiskRemote
|
||||
class DiskS3 final : public IDiskRemote
|
||||
{
|
||||
public:
|
||||
using ObjectMetadata = std::map<std::string, std::string>;
|
||||
using Futures = std::vector<std::future<void>>;
|
||||
|
||||
using SettingsPtr = std::unique_ptr<DiskS3Settings>;
|
||||
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextConstPtr)>;
|
||||
|
||||
friend class DiskS3Reservation;
|
||||
|
||||
class AwsS3KeyKeeper;
|
||||
struct RestoreInformation;
|
||||
|
||||
DiskS3(
|
||||
@ -80,16 +79,11 @@ public:
|
||||
size_t buf_size,
|
||||
WriteMode mode) override;
|
||||
|
||||
void removeFromRemoteFS(const RemoteFSPathKeeper & keeper) final override;
|
||||
|
||||
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
|
||||
void moveFile(const String & from_path, const String & to_path) override;
|
||||
|
||||
void removeFile(const String & path) override { removeSharedFile(path, false); }
|
||||
void removeFileIfExists(const String & path) override;
|
||||
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
|
||||
|
||||
void removeSharedFile(const String & path, bool keep_s3) override;
|
||||
void removeSharedRecursive(const String & path, bool keep_s3) override;
|
||||
|
||||
void createHardLink(const String & src_path, const String & dst_path) override;
|
||||
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
|
||||
|
||||
@ -113,10 +107,6 @@ public:
|
||||
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
|
||||
|
||||
private:
|
||||
void removeMeta(const String & path, AwsS3KeyKeeper & keys);
|
||||
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
|
||||
void removeAws(const AwsS3KeyKeeper & keys);
|
||||
|
||||
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
|
||||
/// Converts revision to binary string with leading zeroes (64 bit).
|
||||
static String revisionToString(UInt64 revision);
|
||||
@ -151,8 +141,8 @@ private:
|
||||
static String pathToDetached(const String & source_path);
|
||||
|
||||
const String bucket;
|
||||
MultiVersion<DiskS3Settings> current_settings;
|
||||
|
||||
MultiVersion<DiskS3Settings> current_settings;
|
||||
/// Gets disk settings from context.
|
||||
GetDiskSettings settings_getter;
|
||||
|
||||
|
@ -225,4 +225,3 @@ void registerDiskS3(DiskFactory & factory)
|
||||
void registerDiskS3(DiskFactory &) {}
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
#include <Disks/HDFS/DiskHDFS.h>
|
||||
#include <Poco/Util/XMLConfiguration.h>
|
||||
|
||||
const String hdfs_uri = "hdfs://192.168.112.2:9000/disk_test/";
|
||||
const String hdfs_uri = "hdfs://172.20.0.2:9000/disk_test/";
|
||||
const String metadata_path = "/home/kssenii/metadata/";
|
||||
const String config_path = "/home/kssenii/ClickHouse/programs/server/config.xml";
|
||||
const String file_name = "test.txt";
|
||||
@ -20,7 +20,8 @@ const String file_name = "test.txt";
|
||||
TEST(DiskTestHDFS, RemoveFileHDFS)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000);
|
||||
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
|
||||
|
||||
DB::HDFSBuilderWrapper builder = DB::createHDFSBuilder(hdfs_uri, *config);
|
||||
DB::HDFSFSPtr fs = DB::createHDFSFS(builder.get());
|
||||
@ -43,7 +44,8 @@ TEST(DiskTestHDFS, RemoveFileHDFS)
|
||||
TEST(DiskTestHDFS, WriteReadHDFS)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000);
|
||||
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
|
||||
|
||||
{
|
||||
auto out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
|
||||
@ -64,7 +66,8 @@ TEST(DiskTestHDFS, WriteReadHDFS)
|
||||
TEST(DiskTestHDFS, RewriteFileHDFS)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000);
|
||||
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
|
||||
|
||||
for (size_t i = 1; i <= 10; ++i)
|
||||
{
|
||||
@ -88,7 +91,8 @@ TEST(DiskTestHDFS, RewriteFileHDFS)
|
||||
TEST(DiskTestHDFS, AppendFileHDFS)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000);
|
||||
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
|
||||
|
||||
{
|
||||
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile(file_name, 1024, DB::WriteMode::Append);
|
||||
@ -117,7 +121,8 @@ TEST(DiskTestHDFS, AppendFileHDFS)
|
||||
TEST(DiskTestHDFS, SeekHDFS)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, metadata_path, *config, 1000);
|
||||
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
|
||||
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
|
||||
|
||||
{
|
||||
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
|
||||
|
Loading…
Reference in New Issue
Block a user