diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index dd78d0ec9fc..d9bbb170dcc 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -34,9 +34,9 @@ namespace ErrorCodes extern const int CANNOT_MREMAP; } - -Exception::Exception(const std::string & msg, int code) - : Poco::Exception(msg, code) +/// Aborts the process if error code is LOGICAL_ERROR. +/// Increments error codes statistics. +void handle_error_code([[maybe_unused]] const std::string & msg, int code) { // In debug builds and builds with sanitizers, treat LOGICAL_ERROR as an assertion failure. // Log the message before we fail. @@ -50,6 +50,18 @@ Exception::Exception(const std::string & msg, int code) ErrorCodes::increment(code); } +Exception::Exception(const std::string & msg, int code) + : Poco::Exception(msg, code) +{ + handle_error_code(msg, code); +} + +Exception::Exception(const std::string & msg, const Exception & nested, int code) + : Poco::Exception(msg, nested, code) +{ + handle_error_code(msg, code); +} + Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc) : Poco::Exception(exc.displayText(), ErrorCodes::POCO_EXCEPTION) { diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 0096c87d6e5..3da2e2fb0d0 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -25,6 +25,7 @@ class Exception : public Poco::Exception public: Exception() = default; Exception(const std::string & msg, int code); + Exception(const std::string & msg, const Exception & nested, int code); Exception(int code, const std::string & message) : Exception(message, code) diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 6090f00b4e2..3f3d23938ba 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -64,64 +64,64 @@ void DiskS3::AwsS3KeyKeeper::addKey(const String & key) back().push_back(obj); } -namespace +String getRandomName() { - String getRandomName() + std::uniform_int_distribution distribution('a', 'z'); + String res(32, ' '); /// The number of bits of entropy should be not less than 128. + for (auto & c : res) + c = distribution(thread_local_rng); + return res; +} + +template +void throwIfError(Aws::Utils::Outcome && response) +{ + if (!response.IsSuccess()) { - std::uniform_int_distribution distribution('a', 'z'); - String res(32, ' '); /// The number of bits of entropy should be not less than 128. - for (auto & c : res) - c = distribution(thread_local_rng); - return res; + const auto & err = response.GetError(); + throw Exception(err.GetMessage(), static_cast(err.GetErrorType())); } +} - template - void throwIfError(Aws::Utils::Outcome && response) +/** + * S3 metadata file layout: + * Number of S3 objects, Total size of all S3 objects. + * Each S3 object represents path where object located in S3 and size of object. + */ +struct DiskS3::Metadata +{ + /// Metadata file version. + static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; + static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; + static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3; + + using PathAndSize = std::pair; + + /// S3 root path. + const String & s3_root_path; + + /// Disk path. + const String & disk_path; + /// Relative path to metadata file on local FS. + String metadata_file_path; + /// Total size of all S3 objects. + size_t total_size; + /// S3 objects paths and their sizes. + std::vector s3_objects; + /// Number of references (hardlinks) to this metadata file. + UInt32 ref_count; + /// Flag indicates that file is read only. + bool read_only = false; + + /// Load metadata by path or create empty if `create` flag is set. + explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false) + : s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0) { - if (!response.IsSuccess()) + if (create) + return; + + try { - const auto & err = response.GetError(); - throw Exception(err.GetMessage(), static_cast(err.GetErrorType())); - } - } - - /** - * S3 metadata file layout: - * Number of S3 objects, Total size of all S3 objects. - * Each S3 object represents path where object located in S3 and size of object. - */ - struct Metadata - { - /// Metadata file version. - static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; - static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; - static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3; - - using PathAndSize = std::pair; - - /// S3 root path. - const String & s3_root_path; - - /// Disk path. - const String & disk_path; - /// Relative path to metadata file on local FS. - String metadata_file_path; - /// Total size of all S3 objects. - size_t total_size; - /// S3 objects paths and their sizes. - std::vector s3_objects; - /// Number of references (hardlinks) to this metadata file. - UInt32 ref_count; - /// Flag indicates that file is read only. - bool read_only = false; - - /// Load metadata by path or create empty if `create` flag is set. - explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false) - : s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0) - { - if (create) - return; - ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */ UInt32 version; @@ -130,7 +130,7 @@ namespace if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG) throw Exception( "Unknown metadata file version. Path: " + disk_path + metadata_file_path - + " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG), + + " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG), ErrorCodes::UNKNOWN_FORMAT); assertChar('\n', buf); @@ -170,234 +170,244 @@ namespace assertChar('\n', buf); } } - - void addObject(const String & path, size_t size) + catch (Exception & e) { - total_size += size; - s3_objects.emplace_back(path, size); + if (e.code() == ErrorCodes::UNKNOWN_FORMAT) + throw; + + throw Exception("Failed to read metadata file", e, ErrorCodes::UNKNOWN_FORMAT); } + } - /// Fsync metadata file if 'sync' flag is set. - void save(bool sync = false) - { - WriteBufferFromFile buf(disk_path + metadata_file_path, 1024); - - writeIntText(VERSION_RELATIVE_PATHS, buf); - writeChar('\n', buf); - - writeIntText(s3_objects.size(), buf); - writeChar('\t', buf); - writeIntText(total_size, buf); - writeChar('\n', buf); - for (const auto & [s3_object_path, s3_object_size] : s3_objects) - { - writeIntText(s3_object_size, buf); - writeChar('\t', buf); - writeEscapedString(s3_object_path, buf); - writeChar('\n', buf); - } - - writeIntText(ref_count, buf); - writeChar('\n', buf); - - writeBoolText(read_only, buf); - writeChar('\n', buf); - - buf.finalize(); - if (sync) - buf.sync(); - } - }; - - /// Reads data from S3 using stored paths in metadata. - class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase + void addObject(const String & path, size_t size) { - public: - ReadIndirectBufferFromS3( - std::shared_ptr client_ptr_, const String & bucket_, Metadata metadata_, size_t buf_size_) - : client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_) + total_size += size; + s3_objects.emplace_back(path, size); + } + + /// Fsync metadata file if 'sync' flag is set. + void save(bool sync = false) + { + WriteBufferFromFile buf(disk_path + metadata_file_path, 1024); + + writeIntText(VERSION_RELATIVE_PATHS, buf); + writeChar('\n', buf); + + writeIntText(s3_objects.size(), buf); + writeChar('\t', buf); + writeIntText(total_size, buf); + writeChar('\n', buf); + for (const auto & [s3_object_path, s3_object_size] : s3_objects) { + writeIntText(s3_object_size, buf); + writeChar('\t', buf); + writeEscapedString(s3_object_path, buf); + writeChar('\n', buf); } - off_t seek(off_t offset_, int whence) override + writeIntText(ref_count, buf); + writeChar('\n', buf); + + writeBoolText(read_only, buf); + writeChar('\n', buf); + + buf.finalize(); + if (sync) + buf.sync(); + } +}; + +DiskS3::Metadata DiskS3::readMeta(const String & path) const +{ + return Metadata(s3_root_path, metadata_path, path); +} + +DiskS3::Metadata DiskS3::createMeta(const String & path) const +{ + return Metadata(s3_root_path, metadata_path, path, true); +} + +/// Reads data from S3 using stored paths in metadata. +class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase +{ +public: + ReadIndirectBufferFromS3( + std::shared_ptr client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t buf_size_) + : client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_) + { + } + + off_t seek(off_t offset_, int whence) override + { + if (whence == SEEK_CUR) { - if (whence == SEEK_CUR) + /// If position within current working buffer - shift pos. + if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position) { - /// If position within current working buffer - shift pos. - if (working_buffer.size() && size_t(getPosition() + offset_) < absolute_position) - { - pos += offset_; - return getPosition(); - } - else - { - absolute_position += offset_; - } - } - else if (whence == SEEK_SET) - { - /// If position within current working buffer - shift pos. - if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size() - && size_t(offset_) < absolute_position) - { - pos = working_buffer.end() - (absolute_position - offset_); - return getPosition(); - } - else - { - absolute_position = offset_; - } + pos += offset_; + return getPosition(); } else - throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + { + absolute_position += offset_; + } + } + else if (whence == SEEK_SET) + { + /// If position within current working buffer - shift pos. + if (working_buffer.size() && size_t(offset_) >= absolute_position - working_buffer.size() + && size_t(offset_) < absolute_position) + { + pos = working_buffer.end() - (absolute_position - offset_); + return getPosition(); + } + else + { + absolute_position = offset_; + } + } + else + throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + current_buf = initialize(); + pos = working_buffer.end(); + + return absolute_position; + } + + off_t getPosition() override { return absolute_position - available(); } + + std::string getFileName() const override { return metadata.metadata_file_path; } + +private: + std::unique_ptr initialize() + { + size_t offset = absolute_position; + for (size_t i = 0; i < metadata.s3_objects.size(); ++i) + { + current_buf_idx = i; + const auto & [path, size] = metadata.s3_objects[i]; + if (size > offset) + { + auto buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); + buf->seek(offset, SEEK_SET); + return buf; + } + offset -= size; + } + return nullptr; + } + + bool nextImpl() override + { + /// Find first available buffer that fits to given offset. + if (!current_buf) current_buf = initialize(); - pos = working_buffer.end(); - return absolute_position; - } - - off_t getPosition() override { return absolute_position - available(); } - - std::string getFileName() const override { return metadata.metadata_file_path; } - - private: - std::unique_ptr initialize() + /// If current buffer has remaining data - use it. + if (current_buf && current_buf->next()) { - size_t offset = absolute_position; - for (size_t i = 0; i < metadata.s3_objects.size(); ++i) - { - current_buf_idx = i; - const auto & [path, size] = metadata.s3_objects[i]; - if (size > offset) - { - auto buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); - buf->seek(offset, SEEK_SET); - return buf; - } - offset -= size; - } - return nullptr; - } - - bool nextImpl() override - { - /// Find first available buffer that fits to given offset. - if (!current_buf) - current_buf = initialize(); - - /// If current buffer has remaining data - use it. - if (current_buf && current_buf->next()) - { - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - return true; - } - - /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.s3_objects.size()) - return false; - - ++current_buf_idx; - const auto & path = metadata.s3_objects[current_buf_idx].first; - current_buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); - current_buf->next(); working_buffer = current_buf->buffer(); absolute_position += working_buffer.size(); - return true; } - std::shared_ptr client_ptr; - const String & bucket; - Metadata metadata; - size_t buf_size; + /// If there is no available buffers - nothing to read. + if (current_buf_idx + 1 >= metadata.s3_objects.size()) + return false; - size_t absolute_position = 0; - size_t current_buf_idx = 0; - std::unique_ptr current_buf; - }; + ++current_buf_idx; + const auto & path = metadata.s3_objects[current_buf_idx].first; + current_buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); + current_buf->next(); + working_buffer = current_buf->buffer(); + absolute_position += working_buffer.size(); - /// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. - class WriteIndirectBufferFromS3 final : public WriteBufferFromFileBase + return true; + } + + std::shared_ptr client_ptr; + const String & bucket; + DiskS3::Metadata metadata; + size_t buf_size; + + size_t absolute_position = 0; + size_t current_buf_idx = 0; + std::unique_ptr current_buf; +}; + +/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. +class WriteIndirectBufferFromS3 final : public WriteBufferFromFileBase +{ +public: + WriteIndirectBufferFromS3( + std::shared_ptr & client_ptr_, + const String & bucket_, + DiskS3::Metadata metadata_, + const String & s3_path_, + std::optional object_metadata_, + bool is_multipart, + size_t min_upload_part_size, + size_t buf_size_) + : WriteBufferFromFileBase(buf_size_, nullptr, 0) + , impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart,std::move(object_metadata_), buf_size_)) + , metadata(std::move(metadata_)) + , s3_path(s3_path_) { - public: - WriteIndirectBufferFromS3( - std::shared_ptr & client_ptr_, - const String & bucket_, - Metadata metadata_, - const String & s3_path_, - std::optional object_metadata_, - bool is_multipart, - size_t min_upload_part_size, - size_t buf_size_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) - , impl(WriteBufferFromS3( - client_ptr_, - bucket_, - metadata_.s3_root_path + s3_path_, - min_upload_part_size, - is_multipart, - std::move(object_metadata_), - buf_size_)) - , metadata(std::move(metadata_)) - , s3_path(s3_path_) + } + + ~WriteIndirectBufferFromS3() override + { + try { + finalize(); } - - ~WriteIndirectBufferFromS3() override + catch (...) { - try - { - finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + tryLogCurrentException(__PRETTY_FUNCTION__); } + } - void finalize() override - { - if (finalized) - return; + void finalize() override + { + if (finalized) + return; - next(); - impl.finalize(); + next(); + impl.finalize(); - metadata.addObject(s3_path, count()); - metadata.save(); + metadata.addObject(s3_path, count()); + metadata.save(); - finalized = true; - } + finalized = true; + } - void sync() override - { - if (finalized) - metadata.save(true); - } + void sync() override + { + if (finalized) + metadata.save(true); + } - std::string getFileName() const override { return metadata.metadata_file_path; } + std::string getFileName() const override { return metadata.metadata_file_path; } - private: - void nextImpl() override - { - /// Transfer current working buffer to WriteBufferFromS3. - impl.swap(*this); +private: + void nextImpl() override + { + /// Transfer current working buffer to WriteBufferFromS3. + impl.swap(*this); - /// Write actual data to S3. - impl.next(); + /// Write actual data to S3. + impl.next(); - /// Return back working buffer. - impl.swap(*this); - } + /// Return back working buffer. + impl.swap(*this); + } - WriteBufferFromS3 impl; - bool finalized = false; - Metadata metadata; - String s3_path; - }; -} + WriteBufferFromS3 impl; + bool finalized = false; + DiskS3::Metadata metadata; + String s3_path; +}; class DiskS3DirectoryIterator final : public IDiskDirectoryIterator @@ -571,7 +581,7 @@ bool DiskS3::isDirectory(const String & path) const size_t DiskS3::getFileSize(const String & path) const { - Metadata metadata(s3_root_path, metadata_path, path); + auto metadata = readMeta(path); return metadata.total_size; } @@ -624,8 +634,8 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) if (exists(to_path)) remove(to_path); - Metadata from(s3_root_path, metadata_path, from_path); - Metadata to(s3_root_path, metadata_path, to_path, true); + auto from = readMeta(from_path); + auto to = createMeta(to_path); for (const auto & [path, size] : from.s3_objects) { @@ -644,7 +654,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const { - Metadata metadata(s3_root_path, metadata_path, path); + auto metadata = readMeta(path); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}", backQuote(metadata_path + path), metadata.s3_objects.size()); @@ -656,12 +666,9 @@ std::unique_ptr DiskS3::readFile(const String & path, si std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t) { bool exist = exists(path); - if (exist) - { - Metadata metadata(s3_root_path, metadata_path, path); - if (metadata.read_only) - throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED); - } + if (exist && readMeta(path).read_only) + throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED); + /// Path to store new S3 object. auto s3_path = getRandomName(); auto object_metadata = createObjectMetadata(path); @@ -672,7 +679,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, if (exist) remove(path); - Metadata metadata(s3_root_path, metadata_path, path, true); + auto metadata = createMeta(path); /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. metadata.save(); @@ -682,7 +689,7 @@ std::unique_ptr DiskS3::writeFile(const String & path, } else { - Metadata metadata(s3_root_path, metadata_path, path); + auto metadata = readMeta(path); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.", backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size()); @@ -696,9 +703,16 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path)); Poco::File file(metadata_path + path); - if (file.isFile()) + + if (!file.isFile()) { - Metadata metadata(s3_root_path, metadata_path, path); + file.remove(); + return; + } + + try + { + auto metadata = readMeta(path); /// If there is no references - delete content from S3. if (metadata.ref_count == 0) @@ -715,9 +729,22 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) file.remove(); } } - else - file.remove(); + catch (const Exception & e) + { + /// If it's impossible to read meta - just remove it from FS. + if (e.code() == ErrorCodes::UNKNOWN_FORMAT) + { + LOG_WARNING( + &Poco::Logger::get("DiskS3"), + "Metadata file {} can't be read by reason: {}. Removing it forcibly.", + backQuote(path), + e.nested() ? e.nested()->message() : e.message()); + file.remove(); + } + else + throw; + } } void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys) @@ -811,7 +838,7 @@ Poco::Timestamp DiskS3::getLastModified(const String & path) void DiskS3::createHardLink(const String & src_path, const String & dst_path) { /// Increment number of references. - Metadata src(s3_root_path, metadata_path, src_path); + auto src = readMeta(src_path); ++src.ref_count; src.save(); @@ -822,7 +849,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path) void DiskS3::createFile(const String & path) { /// Create empty metadata file. - Metadata metadata(s3_root_path, metadata_path, path, true); + auto metadata = createMeta(path); metadata.save(); } @@ -830,7 +857,7 @@ void DiskS3::setReadOnly(const String & path) { /// We should store read only flag inside metadata file (instead of using FS flag), /// because we modify metadata file when create hard-links from it. - Metadata metadata(s3_root_path, metadata_path, path); + auto metadata = readMeta(path); metadata.read_only = true; metadata.save(); } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index f3d20da5ca0..5bc8d51abb2 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -24,6 +24,7 @@ public: friend class DiskS3Reservation; class AwsS3KeyKeeper; + struct Metadata; DiskS3( String name_, @@ -121,6 +122,9 @@ private: void removeAws(const AwsS3KeyKeeper & keys); std::optional createObjectMetadata(const String & path) const; + Metadata readMeta(const String & path) const; + Metadata createMeta(const String & path) const; + private: const String name; std::shared_ptr client;