mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #17935 from Jokser/disk-s3-remove-empty-metadata-files
Forcibly removing empty or bad metadata files from FS for DiskS3
This commit is contained in:
commit
987725f67b
@ -34,9 +34,9 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_MREMAP;
|
||||
}
|
||||
|
||||
|
||||
Exception::Exception(const std::string & msg, int code)
|
||||
: Poco::Exception(msg, code)
|
||||
/// Aborts the process if error code is LOGICAL_ERROR.
|
||||
/// Increments error codes statistics.
|
||||
void handle_error_code([[maybe_unused]] const std::string & msg, int code)
|
||||
{
|
||||
// In debug builds and builds with sanitizers, treat LOGICAL_ERROR as an assertion failure.
|
||||
// Log the message before we fail.
|
||||
@ -50,6 +50,18 @@ Exception::Exception(const std::string & msg, int code)
|
||||
ErrorCodes::increment(code);
|
||||
}
|
||||
|
||||
Exception::Exception(const std::string & msg, int code)
|
||||
: Poco::Exception(msg, code)
|
||||
{
|
||||
handle_error_code(msg, code);
|
||||
}
|
||||
|
||||
Exception::Exception(const std::string & msg, const Exception & nested, int code)
|
||||
: Poco::Exception(msg, nested, code)
|
||||
{
|
||||
handle_error_code(msg, code);
|
||||
}
|
||||
|
||||
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
|
||||
: Poco::Exception(exc.displayText(), ErrorCodes::POCO_EXCEPTION)
|
||||
{
|
||||
|
@ -25,6 +25,7 @@ class Exception : public Poco::Exception
|
||||
public:
|
||||
Exception() = default;
|
||||
Exception(const std::string & msg, int code);
|
||||
Exception(const std::string & msg, const Exception & nested, int code);
|
||||
|
||||
Exception(int code, const std::string & message)
|
||||
: Exception(message, code)
|
||||
|
@ -64,64 +64,64 @@ void DiskS3::AwsS3KeyKeeper::addKey(const String & key)
|
||||
back().push_back(obj);
|
||||
}
|
||||
|
||||
namespace
|
||||
String getRandomName()
|
||||
{
|
||||
String getRandomName()
|
||||
std::uniform_int_distribution<int> distribution('a', 'z');
|
||||
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
|
||||
for (auto & c : res)
|
||||
c = distribution(thread_local_rng);
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Result, typename Error>
|
||||
void throwIfError(Aws::Utils::Outcome<Result, Error> && response)
|
||||
{
|
||||
if (!response.IsSuccess())
|
||||
{
|
||||
std::uniform_int_distribution<int> distribution('a', 'z');
|
||||
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
|
||||
for (auto & c : res)
|
||||
c = distribution(thread_local_rng);
|
||||
return res;
|
||||
const auto & err = response.GetError();
|
||||
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Result, typename Error>
|
||||
void throwIfError(Aws::Utils::Outcome<Result, Error> && response)
|
||||
/**
|
||||
* S3 metadata file layout:
|
||||
* Number of S3 objects, Total size of all S3 objects.
|
||||
* Each S3 object represents path where object located in S3 and size of object.
|
||||
*/
|
||||
struct DiskS3::Metadata
|
||||
{
|
||||
/// Metadata file version.
|
||||
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
|
||||
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
|
||||
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
|
||||
|
||||
using PathAndSize = std::pair<String, size_t>;
|
||||
|
||||
/// S3 root path.
|
||||
const String & s3_root_path;
|
||||
|
||||
/// Disk path.
|
||||
const String & disk_path;
|
||||
/// Relative path to metadata file on local FS.
|
||||
String metadata_file_path;
|
||||
/// Total size of all S3 objects.
|
||||
size_t total_size;
|
||||
/// S3 objects paths and their sizes.
|
||||
std::vector<PathAndSize> s3_objects;
|
||||
/// Number of references (hardlinks) to this metadata file.
|
||||
UInt32 ref_count;
|
||||
/// Flag indicates that file is read only.
|
||||
bool read_only = false;
|
||||
|
||||
/// Load metadata by path or create empty if `create` flag is set.
|
||||
explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false)
|
||||
: s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0)
|
||||
{
|
||||
if (!response.IsSuccess())
|
||||
if (create)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
const auto & err = response.GetError();
|
||||
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 metadata file layout:
|
||||
* Number of S3 objects, Total size of all S3 objects.
|
||||
* Each S3 object represents path where object located in S3 and size of object.
|
||||
*/
|
||||
struct Metadata
|
||||
{
|
||||
/// Metadata file version.
|
||||
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
|
||||
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
|
||||
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
|
||||
|
||||
using PathAndSize = std::pair<String, size_t>;
|
||||
|
||||
/// S3 root path.
|
||||
const String & s3_root_path;
|
||||
|
||||
/// Disk path.
|
||||
const String & disk_path;
|
||||
/// Relative path to metadata file on local FS.
|
||||
String metadata_file_path;
|
||||
/// Total size of all S3 objects.
|
||||
size_t total_size;
|
||||
/// S3 objects paths and their sizes.
|
||||
std::vector<PathAndSize> s3_objects;
|
||||
/// Number of references (hardlinks) to this metadata file.
|
||||
UInt32 ref_count;
|
||||
/// Flag indicates that file is read only.
|
||||
bool read_only = false;
|
||||
|
||||
/// Load metadata by path or create empty if `create` flag is set.
|
||||
explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false)
|
||||
: s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0)
|
||||
{
|
||||
if (create)
|
||||
return;
|
||||
|
||||
ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */
|
||||
|
||||
UInt32 version;
|
||||
@ -130,7 +130,7 @@ namespace
|
||||
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
|
||||
throw Exception(
|
||||
"Unknown metadata file version. Path: " + disk_path + metadata_file_path
|
||||
+ " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG),
|
||||
+ " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG),
|
||||
ErrorCodes::UNKNOWN_FORMAT);
|
||||
|
||||
assertChar('\n', buf);
|
||||
@ -170,234 +170,244 @@ namespace
|
||||
assertChar('\n', buf);
|
||||
}
|
||||
}
|
||||
|
||||
void addObject(const String & path, size_t size)
|
||||
catch (Exception & e)
|
||||
{
|
||||
total_size += size;
|
||||
s3_objects.emplace_back(path, size);
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
||||
throw;
|
||||
|
||||
throw Exception("Failed to read metadata file", e, ErrorCodes::UNKNOWN_FORMAT);
|
||||
}
|
||||
}
|
||||
|
||||
/// Fsync metadata file if 'sync' flag is set.
|
||||
void save(bool sync = false)
|
||||
{
|
||||
WriteBufferFromFile buf(disk_path + metadata_file_path, 1024);
|
||||
|
||||
writeIntText(VERSION_RELATIVE_PATHS, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
writeIntText(s3_objects.size(), buf);
|
||||
writeChar('\t', buf);
|
||||
writeIntText(total_size, buf);
|
||||
writeChar('\n', buf);
|
||||
for (const auto & [s3_object_path, s3_object_size] : s3_objects)
|
||||
{
|
||||
writeIntText(s3_object_size, buf);
|
||||
writeChar('\t', buf);
|
||||
writeEscapedString(s3_object_path, buf);
|
||||
writeChar('\n', buf);
|
||||
}
|
||||
|
||||
writeIntText(ref_count, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
writeBoolText(read_only, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
buf.finalize();
|
||||
if (sync)
|
||||
buf.sync();
|
||||
}
|
||||
};
|
||||
|
||||
/// Reads data from S3 using stored paths in metadata.
|
||||
class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
|
||||
void addObject(const String & path, size_t size)
|
||||
{
|
||||
public:
|
||||
ReadIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, Metadata metadata_, size_t buf_size_)
|
||||
: client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_)
|
||||
total_size += size;
|
||||
s3_objects.emplace_back(path, size);
|
||||
}
|
||||
|
||||
/// Fsync metadata file if 'sync' flag is set.
|
||||
void save(bool sync = false)
|
||||
{
|
||||
WriteBufferFromFile buf(disk_path + metadata_file_path, 1024);
|
||||
|
||||
writeIntText(VERSION_RELATIVE_PATHS, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
writeIntText(s3_objects.size(), buf);
|
||||
writeChar('\t', buf);
|
||||
writeIntText(total_size, buf);
|
||||
writeChar('\n', buf);
|
||||
for (const auto & [s3_object_path, s3_object_size] : s3_objects)
|
||||
{
|
||||
writeIntText(s3_object_size, buf);
|
||||
writeChar('\t', buf);
|
||||
writeEscapedString(s3_object_path, buf);
|
||||
writeChar('\n', buf);
|
||||
}
|
||||
|
||||
off_t seek(off_t offset_, int whence) override
|
||||
writeIntText(ref_count, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
writeBoolText(read_only, buf);
|
||||
writeChar('\n', buf);
|
||||
|
||||
buf.finalize();
|
||||
if (sync)
|
||||
buf.sync();
|
||||
}
|
||||
};
|
||||
|
||||
DiskS3::Metadata DiskS3::readMeta(const String & path) const
|
||||
{
|
||||
return Metadata(s3_root_path, metadata_path, path);
|
||||
}
|
||||
|
||||
DiskS3::Metadata DiskS3::createMeta(const String & path) const
|
||||
{
|
||||
return Metadata(s3_root_path, metadata_path, path, true);
|
||||
}
|
||||
|
||||
/// Reads data from S3 using stored paths in metadata.
|
||||
class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
ReadIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t buf_size_)
|
||||
: client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_)
|
||||
{
|
||||
}
|
||||
|
||||
off_t seek(off_t offset_, int whence) override
|
||||
{
|
||||
if (whence == SEEK_CUR)
|
||||
{
|
||||
if (whence == SEEK_CUR)
|
||||
/// If position within current working buffer - shift pos.
|
||||
if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position)
|
||||
{
|
||||
/// If position within current working buffer - shift pos.
|
||||
if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position)
|
||||
{
|
||||
pos += offset_;
|
||||
return getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
absolute_position += offset_;
|
||||
}
|
||||
}
|
||||
else if (whence == SEEK_SET)
|
||||
{
|
||||
/// If position within current working buffer - shift pos.
|
||||
if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size()
|
||||
&& size_t(offset_) < absolute_position)
|
||||
{
|
||||
pos = working_buffer.end() - (absolute_position - offset_);
|
||||
return getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
absolute_position = offset_;
|
||||
}
|
||||
pos += offset_;
|
||||
return getPosition();
|
||||
}
|
||||
else
|
||||
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
{
|
||||
absolute_position += offset_;
|
||||
}
|
||||
}
|
||||
else if (whence == SEEK_SET)
|
||||
{
|
||||
/// If position within current working buffer - shift pos.
|
||||
if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size()
|
||||
&& size_t(offset_) < absolute_position)
|
||||
{
|
||||
pos = working_buffer.end() - (absolute_position - offset_);
|
||||
return getPosition();
|
||||
}
|
||||
else
|
||||
{
|
||||
absolute_position = offset_;
|
||||
}
|
||||
}
|
||||
else
|
||||
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
current_buf = initialize();
|
||||
pos = working_buffer.end();
|
||||
|
||||
return absolute_position;
|
||||
}
|
||||
|
||||
off_t getPosition() override { return absolute_position - available(); }
|
||||
|
||||
std::string getFileName() const override { return metadata.metadata_file_path; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromS3> initialize()
|
||||
{
|
||||
size_t offset = absolute_position;
|
||||
for (size_t i = 0; i < metadata.s3_objects.size(); ++i)
|
||||
{
|
||||
current_buf_idx = i;
|
||||
const auto & [path, size] = metadata.s3_objects[i];
|
||||
if (size > offset)
|
||||
{
|
||||
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
|
||||
buf->seek(offset, SEEK_SET);
|
||||
return buf;
|
||||
}
|
||||
offset -= size;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
{
|
||||
/// Find first available buffer that fits to given offset.
|
||||
if (!current_buf)
|
||||
current_buf = initialize();
|
||||
pos = working_buffer.end();
|
||||
|
||||
return absolute_position;
|
||||
}
|
||||
|
||||
off_t getPosition() override { return absolute_position - available(); }
|
||||
|
||||
std::string getFileName() const override { return metadata.metadata_file_path; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromS3> initialize()
|
||||
/// If current buffer has remaining data - use it.
|
||||
if (current_buf && current_buf->next())
|
||||
{
|
||||
size_t offset = absolute_position;
|
||||
for (size_t i = 0; i < metadata.s3_objects.size(); ++i)
|
||||
{
|
||||
current_buf_idx = i;
|
||||
const auto & [path, size] = metadata.s3_objects[i];
|
||||
if (size > offset)
|
||||
{
|
||||
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
|
||||
buf->seek(offset, SEEK_SET);
|
||||
return buf;
|
||||
}
|
||||
offset -= size;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool nextImpl() override
|
||||
{
|
||||
/// Find first available buffer that fits to given offset.
|
||||
if (!current_buf)
|
||||
current_buf = initialize();
|
||||
|
||||
/// If current buffer has remaining data - use it.
|
||||
if (current_buf && current_buf->next())
|
||||
{
|
||||
working_buffer = current_buf->buffer();
|
||||
absolute_position += working_buffer.size();
|
||||
return true;
|
||||
}
|
||||
|
||||
/// If there is no available buffers - nothing to read.
|
||||
if (current_buf_idx + 1 >= metadata.s3_objects.size())
|
||||
return false;
|
||||
|
||||
++current_buf_idx;
|
||||
const auto & path = metadata.s3_objects[current_buf_idx].first;
|
||||
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
|
||||
current_buf->next();
|
||||
working_buffer = current_buf->buffer();
|
||||
absolute_position += working_buffer.size();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
const String & bucket;
|
||||
Metadata metadata;
|
||||
size_t buf_size;
|
||||
/// If there is no available buffers - nothing to read.
|
||||
if (current_buf_idx + 1 >= metadata.s3_objects.size())
|
||||
return false;
|
||||
|
||||
size_t absolute_position = 0;
|
||||
size_t current_buf_idx = 0;
|
||||
std::unique_ptr<ReadBufferFromS3> current_buf;
|
||||
};
|
||||
++current_buf_idx;
|
||||
const auto & path = metadata.s3_objects[current_buf_idx].first;
|
||||
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
|
||||
current_buf->next();
|
||||
working_buffer = current_buf->buffer();
|
||||
absolute_position += working_buffer.size();
|
||||
|
||||
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
|
||||
class WriteIndirectBufferFromS3 final : public WriteBufferFromFileBase
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||
const String & bucket;
|
||||
DiskS3::Metadata metadata;
|
||||
size_t buf_size;
|
||||
|
||||
size_t absolute_position = 0;
|
||||
size_t current_buf_idx = 0;
|
||||
std::unique_ptr<ReadBufferFromS3> current_buf;
|
||||
};
|
||||
|
||||
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
|
||||
class WriteIndirectBufferFromS3 final : public WriteBufferFromFileBase
|
||||
{
|
||||
public:
|
||||
WriteIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
|
||||
const String & bucket_,
|
||||
DiskS3::Metadata metadata_,
|
||||
const String & s3_path_,
|
||||
std::optional<DiskS3::ObjectMetadata> object_metadata_,
|
||||
bool is_multipart,
|
||||
size_t min_upload_part_size,
|
||||
size_t buf_size_)
|
||||
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
|
||||
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart,std::move(object_metadata_), buf_size_))
|
||||
, metadata(std::move(metadata_))
|
||||
, s3_path(s3_path_)
|
||||
{
|
||||
public:
|
||||
WriteIndirectBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
|
||||
const String & bucket_,
|
||||
Metadata metadata_,
|
||||
const String & s3_path_,
|
||||
std::optional<DiskS3::ObjectMetadata> object_metadata_,
|
||||
bool is_multipart,
|
||||
size_t min_upload_part_size,
|
||||
size_t buf_size_)
|
||||
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
|
||||
, impl(WriteBufferFromS3(
|
||||
client_ptr_,
|
||||
bucket_,
|
||||
metadata_.s3_root_path + s3_path_,
|
||||
min_upload_part_size,
|
||||
is_multipart,
|
||||
std::move(object_metadata_),
|
||||
buf_size_))
|
||||
, metadata(std::move(metadata_))
|
||||
, s3_path(s3_path_)
|
||||
}
|
||||
|
||||
~WriteIndirectBufferFromS3() override
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
|
||||
~WriteIndirectBufferFromS3() override
|
||||
catch (...)
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void finalize() override
|
||||
{
|
||||
if (finalized)
|
||||
return;
|
||||
void finalize() override
|
||||
{
|
||||
if (finalized)
|
||||
return;
|
||||
|
||||
next();
|
||||
impl.finalize();
|
||||
next();
|
||||
impl.finalize();
|
||||
|
||||
metadata.addObject(s3_path, count());
|
||||
metadata.save();
|
||||
metadata.addObject(s3_path, count());
|
||||
metadata.save();
|
||||
|
||||
finalized = true;
|
||||
}
|
||||
finalized = true;
|
||||
}
|
||||
|
||||
void sync() override
|
||||
{
|
||||
if (finalized)
|
||||
metadata.save(true);
|
||||
}
|
||||
void sync() override
|
||||
{
|
||||
if (finalized)
|
||||
metadata.save(true);
|
||||
}
|
||||
|
||||
std::string getFileName() const override { return metadata.metadata_file_path; }
|
||||
std::string getFileName() const override { return metadata.metadata_file_path; }
|
||||
|
||||
private:
|
||||
void nextImpl() override
|
||||
{
|
||||
/// Transfer current working buffer to WriteBufferFromS3.
|
||||
impl.swap(*this);
|
||||
private:
|
||||
void nextImpl() override
|
||||
{
|
||||
/// Transfer current working buffer to WriteBufferFromS3.
|
||||
impl.swap(*this);
|
||||
|
||||
/// Write actual data to S3.
|
||||
impl.next();
|
||||
/// Write actual data to S3.
|
||||
impl.next();
|
||||
|
||||
/// Return back working buffer.
|
||||
impl.swap(*this);
|
||||
}
|
||||
/// Return back working buffer.
|
||||
impl.swap(*this);
|
||||
}
|
||||
|
||||
WriteBufferFromS3 impl;
|
||||
bool finalized = false;
|
||||
Metadata metadata;
|
||||
String s3_path;
|
||||
};
|
||||
}
|
||||
WriteBufferFromS3 impl;
|
||||
bool finalized = false;
|
||||
DiskS3::Metadata metadata;
|
||||
String s3_path;
|
||||
};
|
||||
|
||||
|
||||
class DiskS3DirectoryIterator final : public IDiskDirectoryIterator
|
||||
@ -571,7 +581,7 @@ bool DiskS3::isDirectory(const String & path) const
|
||||
|
||||
size_t DiskS3::getFileSize(const String & path) const
|
||||
{
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
auto metadata = readMeta(path);
|
||||
return metadata.total_size;
|
||||
}
|
||||
|
||||
@ -624,8 +634,8 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
|
||||
if (exists(to_path))
|
||||
remove(to_path);
|
||||
|
||||
Metadata from(s3_root_path, metadata_path, from_path);
|
||||
Metadata to(s3_root_path, metadata_path, to_path, true);
|
||||
auto from = readMeta(from_path);
|
||||
auto to = createMeta(to_path);
|
||||
|
||||
for (const auto & [path, size] : from.s3_objects)
|
||||
{
|
||||
@ -644,7 +654,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path)
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const
|
||||
{
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}",
|
||||
backQuote(metadata_path + path), metadata.s3_objects.size());
|
||||
@ -656,12 +666,9 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
|
||||
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t)
|
||||
{
|
||||
bool exist = exists(path);
|
||||
if (exist)
|
||||
{
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
if (metadata.read_only)
|
||||
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
|
||||
}
|
||||
if (exist && readMeta(path).read_only)
|
||||
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
|
||||
|
||||
/// Path to store new S3 object.
|
||||
auto s3_path = getRandomName();
|
||||
auto object_metadata = createObjectMetadata(path);
|
||||
@ -672,7 +679,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
if (exist)
|
||||
remove(path);
|
||||
|
||||
Metadata metadata(s3_root_path, metadata_path, path, true);
|
||||
auto metadata = createMeta(path);
|
||||
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
|
||||
metadata.save();
|
||||
|
||||
@ -682,7 +689,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
}
|
||||
else
|
||||
{
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
|
||||
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
|
||||
@ -696,9 +703,16 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
|
||||
if (!file.isFile())
|
||||
{
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
file.remove();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
/// If there is no references - delete content from S3.
|
||||
if (metadata.ref_count == 0)
|
||||
@ -715,9 +729,22 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
else
|
||||
file.remove();
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// If it's impossible to read meta - just remove it from FS.
|
||||
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
|
||||
{
|
||||
LOG_WARNING(
|
||||
&Poco::Logger::get("DiskS3"),
|
||||
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
|
||||
backQuote(path),
|
||||
e.nested() ? e.nested()->message() : e.message());
|
||||
|
||||
file.remove();
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys)
|
||||
@ -811,7 +838,7 @@ Poco::Timestamp DiskS3::getLastModified(const String & path)
|
||||
void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
||||
{
|
||||
/// Increment number of references.
|
||||
Metadata src(s3_root_path, metadata_path, src_path);
|
||||
auto src = readMeta(src_path);
|
||||
++src.ref_count;
|
||||
src.save();
|
||||
|
||||
@ -822,7 +849,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path)
|
||||
void DiskS3::createFile(const String & path)
|
||||
{
|
||||
/// Create empty metadata file.
|
||||
Metadata metadata(s3_root_path, metadata_path, path, true);
|
||||
auto metadata = createMeta(path);
|
||||
metadata.save();
|
||||
}
|
||||
|
||||
@ -830,7 +857,7 @@ void DiskS3::setReadOnly(const String & path)
|
||||
{
|
||||
/// We should store read only flag inside metadata file (instead of using FS flag),
|
||||
/// because we modify metadata file when create hard-links from it.
|
||||
Metadata metadata(s3_root_path, metadata_path, path);
|
||||
auto metadata = readMeta(path);
|
||||
metadata.read_only = true;
|
||||
metadata.save();
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
friend class DiskS3Reservation;
|
||||
|
||||
class AwsS3KeyKeeper;
|
||||
struct Metadata;
|
||||
|
||||
DiskS3(
|
||||
String name_,
|
||||
@ -121,6 +122,9 @@ private:
|
||||
void removeAws(const AwsS3KeyKeeper & keys);
|
||||
std::optional<ObjectMetadata> createObjectMetadata(const String & path) const;
|
||||
|
||||
Metadata readMeta(const String & path) const;
|
||||
Metadata createMeta(const String & path) const;
|
||||
|
||||
private:
|
||||
const String name;
|
||||
std::shared_ptr<Aws::S3::S3Client> client;
|
||||
|
Loading…
Reference in New Issue
Block a user