From abfacdaadc737f3655f4d9d2dd96f8cf407996d1 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Mon, 27 Jan 2020 21:44:30 +0300 Subject: [PATCH] SeekableReadBuffer refactoring. Store size and multiple references for S3 metadata file. Log engine support for S3. --- dbms/programs/obfuscator/Obfuscator.cpp | 2 +- .../CachedCompressedReadBuffer.cpp | 2 +- .../CompressedReadBufferFromFile.cpp | 2 +- dbms/src/Disks/DiskS3.cpp | 315 ++++++++++++++---- dbms/src/Disks/DiskS3.h | 2 - .../IO/MMapReadBufferFromFileDescriptor.cpp | 2 +- .../src/IO/MMapReadBufferFromFileDescriptor.h | 6 +- dbms/src/IO/ReadBufferAIO.cpp | 2 +- dbms/src/IO/ReadBufferAIO.h | 4 +- dbms/src/IO/ReadBufferFromFileDescriptor.cpp | 2 +- dbms/src/IO/ReadBufferFromFileDescriptor.h | 4 +- dbms/src/IO/ReadBufferFromMemory.h | 3 +- dbms/src/IO/ReadBufferFromS3.cpp | 7 +- dbms/src/IO/ReadBufferFromS3.h | 5 +- dbms/src/IO/SeekableReadBuffer.h | 8 +- dbms/src/IO/WriteBufferFromS3.cpp | 44 ++- dbms/src/IO/WriteBufferFromS3.h | 7 +- dbms/src/Storages/StorageLog.cpp | 4 +- .../__init__.py | 0 .../configs/config.d/log_conf.xml | 0 .../configs/config.xml | 0 .../configs/users.xml | 0 .../test.py | 28 +- 23 files changed, 318 insertions(+), 131 deletions(-) rename dbms/tests/integration/{test_tinylog_s3 => test_log_family_s3}/__init__.py (100%) rename dbms/tests/integration/{test_tinylog_s3 => test_log_family_s3}/configs/config.d/log_conf.xml (100%) rename dbms/tests/integration/{test_tinylog_s3 => test_log_family_s3}/configs/config.xml (100%) rename dbms/tests/integration/{test_tinylog_s3 => test_log_family_s3}/configs/users.xml (100%) rename dbms/tests/integration/{test_tinylog_s3 => test_log_family_s3}/test.py (61%) diff --git a/dbms/programs/obfuscator/Obfuscator.cpp b/dbms/programs/obfuscator/Obfuscator.cpp index 49c6fabd435..83aa43ab2d2 100644 --- a/dbms/programs/obfuscator/Obfuscator.cpp +++ b/dbms/programs/obfuscator/Obfuscator.cpp @@ -1070,7 +1070,7 @@ try if (!silent) std::cerr << "Generating data\n"; - file_in.seek(0); + file_in.seek(0, SEEK_SET); BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size); BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header); diff --git a/dbms/src/Compression/CachedCompressedReadBuffer.cpp b/dbms/src/Compression/CachedCompressedReadBuffer.cpp index a1bcb8a7d66..878c980d53f 100644 --- a/dbms/src/Compression/CachedCompressedReadBuffer.cpp +++ b/dbms/src/Compression/CachedCompressedReadBuffer.cpp @@ -39,7 +39,7 @@ bool CachedCompressedReadBuffer::nextImpl() { /// If not, read it from the file. initInput(); - file_in->seek(file_pos); + file_in->seek(file_pos, SEEK_SET); owned_cell = std::make_shared(); diff --git a/dbms/src/Compression/CompressedReadBufferFromFile.cpp b/dbms/src/Compression/CompressedReadBufferFromFile.cpp index 22eaf9e15e8..63bacde6d78 100644 --- a/dbms/src/Compression/CompressedReadBufferFromFile.cpp +++ b/dbms/src/Compression/CompressedReadBufferFromFile.cpp @@ -55,7 +55,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t } else { - file_in.seek(offset_in_compressed_file); + file_in.seek(offset_in_compressed_file, SEEK_SET); bytes += offset(); nextImpl(); diff --git a/dbms/src/Disks/DiskS3.cpp b/dbms/src/Disks/DiskS3.cpp index 94caf2d018e..b1d4fd3dde6 100644 --- a/dbms/src/Disks/DiskS3.cpp +++ b/dbms/src/Disks/DiskS3.cpp @@ -4,6 +4,7 @@ # include "DiskFactory.h" # include +# include # include # include # include @@ -27,6 +28,7 @@ namespace ErrorCodes extern const int FILE_ALREADY_EXISTS; extern const int FILE_DOESNT_EXIST; extern const int PATH_ACCESS_DENIED; + extern const int SEEK_POSITION_OUT_OF_BOUND; } namespace @@ -41,41 +43,192 @@ namespace } } - String readKeyFromFile(const String & path) + /** + * S3 metadata file layout: + * Number of references to S3 objects, Total size of all S3 objects. + * Each reference to S3 object and size. + */ + struct Metadata { - String key; - ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */ - readStringUntilEOF(key, buf); - return key; - } + // Path to metadata file on local FS. + String local_path; + // S3 object references count. + UInt32 ref_count; + // Total size of all S3 objects. + size_t total_size; + // References to S3 objects and their sizes. + std::vector> references; - void writeKeyToFile(const String & key, const String & path) + explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) { } + + // Load metadata by path or create empty if `create` flag is set. + explicit Metadata(const String & path, bool create = false) : + local_path(path), ref_count(0), total_size(0), references(0) + { + if (create) + return; + + char x; // To skip separators. + ReadBufferFromFile buf(path, 1024); /* reasonable buffer size for small file */ + readIntText(ref_count, buf); + readChar(x, buf); + readIntText(total_size, buf); + readChar(x, buf); + references = std::vector> (ref_count); + for (UInt32 i = 0; i < ref_count; ++i) + { + String ref; + size_t size; + readIntText(size, buf); + readChar(x, buf); + readEscapedString(ref, buf); + readChar(x, buf); + references[i] = std::make_pair(ref, size); + } + } + + void addReference(const String & ref, size_t size) + { + ref_count++; + total_size += size; + references.emplace_back(ref, size); + } + + void save() { + WriteBufferFromFile buf(local_path, 1024); + writeIntText(ref_count, buf); + writeChar('\t', buf); + writeIntText(total_size, buf); + writeChar('\n', buf); + for (UInt32 i = 0; i < ref_count; ++i) + { + auto ref_and_size = references[i]; + writeIntText(ref_and_size.second, buf); + writeChar('\t', buf); + writeEscapedString(ref_and_size.first, buf); + writeChar('\n', buf); + } + buf.finalize(); + } + }; + + // Reads data from S3. + // It supports multiple S3 references and reads them one by one. + class ReadIndirectBufferFromS3 : public BufferWithOwnMemory { - WriteBufferFromFile buf(path, 1024); - writeString(key, buf); - buf.next(); - } + public: + ReadIndirectBufferFromS3( + std::shared_ptr client_ptr_, + const String & bucket_, + Metadata metadata_, + size_t buf_size_ + ) : BufferWithOwnMemory(buf_size_) + , client_ptr(std::move(client_ptr_)) + , bucket(bucket_) + , metadata(std::move(metadata_)) + , buf_size(buf_size_) + , offset(0) + , initialized(false) + , current_buf_idx(0) + , current_buf(nullptr) + { + } - /// Stores data in S3 and the object key in file in local filesystem. + off_t seek(off_t off, int) override { + if (!initialized) + { + if (off < 0 || metadata.total_size <= static_cast(off)) + throw Exception("Seek position is out of bounds. " + "Offset: " + std::to_string(off) + ", Max: " + std::to_string(metadata.total_size), + ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); + + offset = off; + } + return offset; + } + + private: + std::unique_ptr initialize() + { + for (UInt32 i = 0; i < metadata.ref_count; ++i) + { + current_buf_idx = i; + auto ref = metadata.references[i].first; + auto size = metadata.references[i].second; + if (size > offset) + { + auto buf = std::make_unique(client_ptr, bucket, ref, buf_size); + buf->seek(offset, SEEK_SET); + return buf; + } + offset -= size; + } + return nullptr; + } + + bool nextImpl() override + { + // Find first available buffer according to offset. + if (!initialized) + { + current_buf = initialize(); + + initialized = true; + } + + // If current buffer has remaining data - use it. + if (current_buf && current_buf->next()) + { + working_buffer = current_buf->buffer(); + return true; + } + + // If there is no available buffers - nothing to read. + if (current_buf_idx + 1 >= metadata.ref_count) + return false; + + current_buf_idx++; + auto ref = metadata.references[current_buf_idx].first; + current_buf = std::make_unique(client_ptr, bucket, ref, buf_size); + current_buf->next(); + working_buffer = current_buf->buffer(); + + return true; + } + + private: + std::shared_ptr client_ptr; + const String & bucket; + Metadata metadata; + size_t buf_size; + + size_t offset; + bool initialized; + UInt32 current_buf_idx; + std::unique_ptr current_buf; + }; + + /// Stores data in S3 and appends the object key (reference) to metadata file on local FS. class WriteIndirectBufferFromS3 : public WriteBufferFromS3 { public: WriteIndirectBufferFromS3( std::shared_ptr & client_ptr_, const String & bucket_, - const String & metadata_path_, - const String & s3_path_, + Metadata metadata_, + const String & s3_ref_, size_t buf_size_) - : WriteBufferFromS3(client_ptr_, bucket_, s3_path_, DEFAULT_BLOCK_SIZE, buf_size_) - , metadata_path(metadata_path_) - , s3_path(s3_path_) + : WriteBufferFromS3(client_ptr_, bucket_, s3_ref_, DEFAULT_BLOCK_SIZE, buf_size_) + , metadata(std::move(metadata_)) + , s3_ref(s3_ref_) { } void finalize() override { WriteBufferFromS3::finalize(); - writeKeyToFile(s3_path, metadata_path); + metadata.addReference(s3_ref, total_size); + metadata.save(); finalized = true; } @@ -96,8 +249,8 @@ namespace private: bool finalized = false; - const String metadata_path; - const String s3_path; + Metadata metadata; + String s3_ref; }; } @@ -179,7 +332,7 @@ bool DiskS3::exists(const String & path) const bool DiskS3::isFile(const String & path) const { - return Poco::File(metadata_path + path).isFile(); + return Poco::File(metadata_path + path).isFile(); } bool DiskS3::isDirectory(const String & path) const @@ -189,20 +342,8 @@ bool DiskS3::isDirectory(const String & path) const size_t DiskS3::getFileSize(const String & path) const { - // TODO: Consider storing actual file size in meta file. - Aws::S3::Model::GetObjectRequest request; - request.SetBucket(bucket); - request.SetKey(getS3Path(path)); - auto outcome = client->GetObject(request); - if (!outcome.IsSuccess()) - { - auto & err = outcome.GetError(); - throw Exception(err.GetMessage(), static_cast(err.GetErrorType())); - } - else - { - return outcome.GetResult().GetContentLength(); - } + Metadata metadata(metadata_path + path); + return metadata.total_size; } void DiskS3::createDirectory(const String & path) @@ -230,7 +371,7 @@ void DiskS3::clearDirectory(const String & path) void DiskS3::moveFile(const String & from_path, const String & to_path) { if (exists(to_path)) - throw Exception("File already exists " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); + throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path); } @@ -254,51 +395,90 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) if (exists(to_path)) remove(to_path); - String s3_from_path = readKeyFromFile(metadata_path + from_path); - String s3_to_path = s3_root_path + getRandomName(); + Metadata from(metadata_path + from_path); + Metadata to(metadata_path + to_path, true); - Aws::S3::Model::CopyObjectRequest req; - req.SetBucket(bucket); - req.SetCopySource(s3_from_path); - req.SetKey(s3_to_path); - throwIfError(client->CopyObject(req)); - writeKeyToFile(s3_to_path, metadata_path + to_path); + for (UInt32 i = 0; i < from.ref_count; ++i) + { + auto ref = from.references[i].first; + auto size = from.references[i].second; + auto new_ref = s3_root_path + getRandomName(); + Aws::S3::Model::CopyObjectRequest req; + req.SetBucket(bucket); + req.SetCopySource(ref); + req.SetKey(new_ref); + throwIfError(client->CopyObject(req)); + + to.addReference(new_ref, size); + } + + to.save(); } std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size) const { - return std::make_unique(client, bucket, getS3Path(path), buf_size); + Metadata metadata(metadata_path + path); + + LOG_DEBUG( + &Logger::get("DiskS3"), + "Read from file by path: " << backQuote(metadata_path + path) + << " Existing S3 references: " << metadata.ref_count); + + return std::make_unique(client, bucket, metadata, buf_size); } std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode) { - // TODO: Optimize append mode. Consider storing several S3 references in one meta file. - if (!exists(path) || mode == WriteMode::Rewrite) + bool exist = exists(path); + // Reference to store new S3 object. + auto s3_ref = s3_root_path + getRandomName(); + if (!exist || mode == WriteMode::Rewrite) { - String new_s3_path = s3_root_path + getRandomName(); - return std::make_unique(client, bucket, metadata_path + path, new_s3_path, buf_size); + // If metadata file exists - remove and create new. + if (exist) + remove(path); + + Metadata metadata(metadata_path + path, true); + // Save empty metadata to disk to have ability to get file size while buffer is not finalized. + metadata.save(); + + LOG_DEBUG( + &Logger::get("DiskS3"), + "Write to file by path: " << backQuote(metadata_path + path) << " New S3 reference: " << s3_ref); + + return std::make_unique(client, bucket, metadata, s3_ref, buf_size); } else { - auto old_s3_path = getS3Path(path); - ReadBufferFromS3 read_buffer(client, bucket, old_s3_path, buf_size); - auto writeBuffer = std::make_unique(client, bucket, metadata_path + path, old_s3_path, buf_size); - std::vector buffer(buf_size); - while (!read_buffer.eof()) - writeBuffer->write(buffer.data(), read_buffer.read(buffer.data(), buf_size)); - return writeBuffer; + Metadata metadata(metadata_path + path); + + LOG_DEBUG( + &Logger::get("DiskS3"), + "Append to file by path: " << backQuote(metadata_path + path) << " New S3 reference: " << s3_ref + << " Existing S3 references: " << metadata.ref_count); + + return std::make_unique(client, bucket, metadata, s3_ref, buf_size); } } void DiskS3::remove(const String & path) { + LOG_DEBUG(&Logger::get("DiskS3"), "Remove file by path: " << backQuote(metadata_path + path)); + Poco::File file(metadata_path + path); if (file.isFile()) { - Aws::S3::Model::DeleteObjectRequest request; - request.SetBucket(bucket); - request.SetKey(getS3Path(path)); - throwIfError(client->DeleteObject(request)); + Metadata metadata(file); + for (UInt32 i = 0; i < metadata.ref_count; ++i) + { + auto ref = metadata.references[i].first; + + // TODO: Make operation idempotent. Do not throw exception if key is already deleted. + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucket); + request.SetKey(ref); + throwIfError(client->DeleteObject(request)); + } } file.remove(); } @@ -310,25 +490,14 @@ void DiskS3::removeRecursive(const String & path) Poco::File file(metadata_path + path); if (file.isFile()) { - Aws::S3::Model::DeleteObjectRequest request; - request.SetBucket(bucket); - request.SetKey(getS3Path(path)); - throwIfError(client->DeleteObject(request)); + remove(metadata_path + path); } else { for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) removeRecursive(it->path()); + file.remove(); } - file.remove(); -} - -String DiskS3::getS3Path(const String & path) const -{ - if (!exists(path)) - throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST); - - return readKeyFromFile(metadata_path + path); } String DiskS3::getRandomName() const diff --git a/dbms/src/Disks/DiskS3.h b/dbms/src/Disks/DiskS3.h index 42c782e6ed2..9660d1eb2b1 100644 --- a/dbms/src/Disks/DiskS3.h +++ b/dbms/src/Disks/DiskS3.h @@ -71,8 +71,6 @@ public: void removeRecursive(const String & path) override; private: - String getS3Path(const String & path) const; - String getRandomName() const; bool tryReserve(UInt64 bytes); diff --git a/dbms/src/IO/MMapReadBufferFromFileDescriptor.cpp b/dbms/src/IO/MMapReadBufferFromFileDescriptor.cpp index 034c8524f83..2d1ddba5f58 100644 --- a/dbms/src/IO/MMapReadBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/MMapReadBufferFromFileDescriptor.cpp @@ -105,7 +105,7 @@ off_t MMapReadBufferFromFileDescriptor::getPositionInFile() return count(); } -off_t MMapReadBufferFromFileDescriptor::doSeek(off_t offset, int whence) +off_t MMapReadBufferFromFileDescriptor::seek(off_t offset, int whence) { off_t new_pos; if (whence == SEEK_SET) diff --git a/dbms/src/IO/MMapReadBufferFromFileDescriptor.h b/dbms/src/IO/MMapReadBufferFromFileDescriptor.h index 3cf5e89de7a..fb6d3651b41 100644 --- a/dbms/src/IO/MMapReadBufferFromFileDescriptor.h +++ b/dbms/src/IO/MMapReadBufferFromFileDescriptor.h @@ -13,14 +13,14 @@ namespace DB */ class MMapReadBufferFromFileDescriptor : public ReadBufferFromFileBase { +public: + off_t seek(off_t off, int whence) override; + protected: MMapReadBufferFromFileDescriptor() {} - void init(int fd_, size_t offset, size_t length_); void init(int fd_, size_t offset); - off_t doSeek(off_t off, int whence) override; - public: MMapReadBufferFromFileDescriptor(int fd_, size_t offset_, size_t length_); diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index c3dd04b0027..f7d3cd475af 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -149,7 +149,7 @@ bool ReadBufferAIO::nextImpl() return true; } -off_t ReadBufferAIO::doSeek(off_t off, int whence) +off_t ReadBufferAIO::seek(off_t off, int whence) { off_t new_pos_in_file; diff --git a/dbms/src/IO/ReadBufferAIO.h b/dbms/src/IO/ReadBufferAIO.h index e8d7265f69f..446034e4733 100644 --- a/dbms/src/IO/ReadBufferAIO.h +++ b/dbms/src/IO/ReadBufferAIO.h @@ -40,11 +40,11 @@ public: std::string getFileName() const override { return filename; } int getFD() const override { return fd; } + off_t seek(off_t off, int whence) override; + private: /// bool nextImpl() override; - /// - off_t doSeek(off_t off, int whence) override; /// Synchronously read the data. void synchronousRead(); /// Get data from an asynchronous request. diff --git a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp index 776d5fc828d..b127f1e6128 100644 --- a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp @@ -99,7 +99,7 @@ bool ReadBufferFromFileDescriptor::nextImpl() /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. -off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence) +off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) { off_t new_pos; if (whence == SEEK_SET) diff --git a/dbms/src/IO/ReadBufferFromFileDescriptor.h b/dbms/src/IO/ReadBufferFromFileDescriptor.h index 55065931148..b19a04aef18 100644 --- a/dbms/src/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/src/IO/ReadBufferFromFileDescriptor.h @@ -37,10 +37,10 @@ public: return pos_in_file - (working_buffer.end() - pos); } -private: /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. - off_t doSeek(off_t offset, int whence) override; + off_t seek(off_t off, int whence) override; +private: /// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout. bool poll(size_t timeout_microseconds); }; diff --git a/dbms/src/IO/ReadBufferFromMemory.h b/dbms/src/IO/ReadBufferFromMemory.h index ce8791e4873..72b1b182c00 100644 --- a/dbms/src/IO/ReadBufferFromMemory.h +++ b/dbms/src/IO/ReadBufferFromMemory.h @@ -23,8 +23,7 @@ public: ReadBufferFromMemory(const signed char * buf, size_t size) : SeekableReadBuffer(const_cast(reinterpret_cast(buf)), size, 0) {} -protected: - off_t doSeek(off_t, int) override { return 0; } + off_t seek(off_t, int) override { return 0; } }; } diff --git a/dbms/src/IO/ReadBufferFromS3.cpp b/dbms/src/IO/ReadBufferFromS3.cpp index 9fc7434e8d1..564e076c611 100644 --- a/dbms/src/IO/ReadBufferFromS3.cpp +++ b/dbms/src/IO/ReadBufferFromS3.cpp @@ -49,7 +49,7 @@ bool ReadBufferFromS3::nextImpl() return true; } -off_t ReadBufferFromS3::doSeek(off_t offset_, int) { +off_t ReadBufferFromS3::seek(off_t offset_, int) { if (!initialized && offset_) offset = offset_; @@ -57,11 +57,14 @@ off_t ReadBufferFromS3::doSeek(off_t offset_, int) { } std::unique_ptr ReadBufferFromS3::initialize() { + LOG_DEBUG(log, "Read S3 object. " + "Bucket: " + bucket + ", Key: " + key + ", Offset: " + std::to_string(offset)); + Aws::S3::Model::GetObjectRequest req; req.SetBucket(bucket); req.SetKey(key); if (offset != 0) - req.SetRange(std::to_string(offset) + "-"); + req.SetRange("bytes=" + std::to_string(offset) + "-"); Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); diff --git a/dbms/src/IO/ReadBufferFromS3.h b/dbms/src/IO/ReadBufferFromS3.h index fc81efb3b91..45e338f6af0 100644 --- a/dbms/src/IO/ReadBufferFromS3.h +++ b/dbms/src/IO/ReadBufferFromS3.h @@ -35,9 +35,6 @@ private: Logger * log = &Logger::get("ReadBufferFromS3"); -protected: - off_t doSeek(off_t off, int whence) override; - public: explicit ReadBufferFromS3(std::shared_ptr client_ptr_, const String & bucket_, @@ -46,6 +43,8 @@ public: bool nextImpl() override; + off_t seek(off_t off, int whence) override; + private: std::unique_ptr initialize(); }; diff --git a/dbms/src/IO/SeekableReadBuffer.h b/dbms/src/IO/SeekableReadBuffer.h index 0ecbf2f9c8b..0d3e85ec4e4 100644 --- a/dbms/src/IO/SeekableReadBuffer.h +++ b/dbms/src/IO/SeekableReadBuffer.h @@ -17,13 +17,7 @@ public: SeekableReadBuffer(Position ptr, size_t size, size_t offset) : ReadBuffer(ptr, size, offset) {} - off_t seek(off_t off, int whence = SEEK_SET) { - return doSeek(off, whence); - }; - -protected: - /// Children implementation should be able to seek backwards - virtual off_t doSeek(off_t off, int whence) = 0; + virtual off_t seek(off_t off, int whence) = 0; }; } diff --git a/dbms/src/IO/WriteBufferFromS3.cpp b/dbms/src/IO/WriteBufferFromS3.cpp index ee18ad719d4..074ea4a93d0 100644 --- a/dbms/src/IO/WriteBufferFromS3.cpp +++ b/dbms/src/IO/WriteBufferFromS3.cpp @@ -41,7 +41,7 @@ WriteBufferFromS3::WriteBufferFromS3( , key(key_) , client_ptr(std::move(client_ptr_)) , minimum_upload_part_size {minimum_upload_part_size_} - , temporary_buffer {std::make_unique(buffer_string)} + , temporary_buffer {std::make_unique()} , last_part_size {0} { initiate(); @@ -60,9 +60,9 @@ void WriteBufferFromS3::nextImpl() if (last_part_size > minimum_upload_part_size) { temporary_buffer->finalize(); - writePart(buffer_string); + writePart(temporary_buffer->str()); last_part_size = 0; - temporary_buffer = std::make_unique(buffer_string); + temporary_buffer = std::make_unique(); } } @@ -70,11 +70,9 @@ void WriteBufferFromS3::nextImpl() void WriteBufferFromS3::finalize() { next(); + temporary_buffer->finalize(); - if (!buffer_string.empty()) - { - writePart(buffer_string); - } + writePart(temporary_buffer->str()); complete(); } @@ -104,7 +102,7 @@ void WriteBufferFromS3::initiate() if (outcome.IsSuccess()) { upload_id = outcome.GetResult().GetUploadId(); - LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id); + LOG_DEBUG(log, "Multipart upload initiated. Upload id: " << upload_id); } else throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); @@ -113,6 +111,9 @@ void WriteBufferFromS3::initiate() void WriteBufferFromS3::writePart(const String & data) { + if (data.empty()) + return; + if (part_tags.size() == S3_WARN_MAX_PARTS) { // Don't throw exception here by ourselves but leave the decision to take by S3 server. @@ -130,11 +131,16 @@ void WriteBufferFromS3::writePart(const String & data) auto outcome = client_ptr->UploadPart(req); + LOG_TRACE(log, "Writing part. " + "Bucket: " << bucket << ", Key: " << key << ", Upload_id: " << upload_id << ", Data size: " << data.size()); + if (outcome.IsSuccess()) { auto etag = outcome.GetResult().GetETag(); part_tags.push_back(etag); - LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag); + total_size += data.size(); + LOG_DEBUG(log, "Writing part finished. " + "Total parts: " << part_tags.size() << ", Upload_id: " << upload_id << ", Etag: " << etag); } else throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); @@ -143,24 +149,30 @@ void WriteBufferFromS3::writePart(const String & data) void WriteBufferFromS3::complete() { + LOG_DEBUG(log, "Completing multipart upload. " + "Bucket: " + bucket + ", Key: " + key + ", Upload_id: " + upload_id); + Aws::S3::Model::CompleteMultipartUploadRequest req; req.SetBucket(bucket); req.SetKey(key); req.SetUploadId(upload_id); - Aws::S3::Model::CompletedMultipartUpload multipart_upload; - for (size_t i = 0; i < part_tags.size(); ++i) + if (!part_tags.empty()) { - Aws::S3::Model::CompletedPart part; - multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); - } + Aws::S3::Model::CompletedMultipartUpload multipart_upload; + for (size_t i = 0; i < part_tags.size(); ++i) + { + Aws::S3::Model::CompletedPart part; + multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); + } - req.SetMultipartUpload(multipart_upload); + req.SetMultipartUpload(multipart_upload); + } auto outcome = client_ptr->CompleteMultipartUpload(req); if (outcome.IsSuccess()) - LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id); + LOG_DEBUG(log, "Multipart upload completed. Upload_id: " << upload_id); else throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } diff --git a/dbms/src/IO/WriteBufferFromS3.h b/dbms/src/IO/WriteBufferFromS3.h index 85f2e3b651c..017a655a4b4 100644 --- a/dbms/src/IO/WriteBufferFromS3.h +++ b/dbms/src/IO/WriteBufferFromS3.h @@ -28,8 +28,7 @@ private: String key; std::shared_ptr client_ptr; size_t minimum_upload_part_size; - String buffer_string; - std::unique_ptr temporary_buffer; + std::unique_ptr temporary_buffer; size_t last_part_size; /// Upload in S3 is made in parts. @@ -39,6 +38,10 @@ private: Logger * log = &Logger::get("WriteBufferFromS3"); +protected: + // Total size of all uploaded parts. + size_t total_size = 0; + public: explicit WriteBufferFromS3(std::shared_ptr client_ptr_, const String & bucket_, diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index d411f155422..549dd352a48 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -85,7 +85,7 @@ private: compressed(*plain) { if (offset) - plain->seek(offset); + plain->seek(offset, SEEK_SET); } std::unique_ptr plain; @@ -109,7 +109,7 @@ public: explicit LogBlockOutputStream(StorageLog & storage_) : storage(storage_), lock(storage.rwlock), - marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Append)) + marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite)) { } diff --git a/dbms/tests/integration/test_tinylog_s3/__init__.py b/dbms/tests/integration/test_log_family_s3/__init__.py similarity index 100% rename from dbms/tests/integration/test_tinylog_s3/__init__.py rename to dbms/tests/integration/test_log_family_s3/__init__.py diff --git a/dbms/tests/integration/test_tinylog_s3/configs/config.d/log_conf.xml b/dbms/tests/integration/test_log_family_s3/configs/config.d/log_conf.xml similarity index 100% rename from dbms/tests/integration/test_tinylog_s3/configs/config.d/log_conf.xml rename to dbms/tests/integration/test_log_family_s3/configs/config.d/log_conf.xml diff --git a/dbms/tests/integration/test_tinylog_s3/configs/config.xml b/dbms/tests/integration/test_log_family_s3/configs/config.xml similarity index 100% rename from dbms/tests/integration/test_tinylog_s3/configs/config.xml rename to dbms/tests/integration/test_log_family_s3/configs/config.xml diff --git a/dbms/tests/integration/test_tinylog_s3/configs/users.xml b/dbms/tests/integration/test_log_family_s3/configs/users.xml similarity index 100% rename from dbms/tests/integration/test_tinylog_s3/configs/users.xml rename to dbms/tests/integration/test_log_family_s3/configs/users.xml diff --git a/dbms/tests/integration/test_tinylog_s3/test.py b/dbms/tests/integration/test_log_family_s3/test.py similarity index 61% rename from dbms/tests/integration/test_tinylog_s3/test.py rename to dbms/tests/integration/test_log_family_s3/test.py index 9060508c06c..09002a95f2c 100644 --- a/dbms/tests/integration/test_tinylog_s3/test.py +++ b/dbms/tests/integration/test_log_family_s3/test.py @@ -34,16 +34,26 @@ def cluster(): cluster.shutdown() -def test_tinylog_s3(cluster): +@pytest.mark.parametrize("log_engine,files_overhead", [("TinyLog", 1), ("Log", 2)]) +def test_log_family_s3(cluster, log_engine, files_overhead): node = cluster.instances["node"] minio = cluster.minio_client - node.query("CREATE TABLE s3_test (id UInt64) Engine=TinyLog") - node.query("INSERT INTO s3_test SELECT number FROM numbers(3)") - assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n" - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 - node.query("INSERT INTO s3_test SELECT number + 3 FROM numbers(3)") - assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n5\n" - assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 - node.query("DROP TABLE s3_test") + node.query("CREATE TABLE s3_test (id UInt64) Engine={}".format(log_engine)) + + node.query("INSERT INTO s3_test SELECT number FROM numbers(5)") + assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 1 + files_overhead + + node.query("INSERT INTO s3_test SELECT number + 5 FROM numbers(3)") + assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 2 + files_overhead + + node.query("INSERT INTO s3_test SELECT number + 8 FROM numbers(1)") + assert node.query("SELECT * FROM s3_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n" + assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 + files_overhead + + node.query("TRUNCATE TABLE s3_test") assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0 + + node.query("DROP TABLE s3_test")