diff --git a/dbms/src/Common/CounterInFile.h b/dbms/src/Common/CounterInFile.h index fc723d37367..5ef91b10412 100644 --- a/dbms/src/Common/CounterInFile.h +++ b/dbms/src/Common/CounterInFile.h @@ -99,8 +99,8 @@ public: res += delta; DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); - wb.seek(0); - wb.truncate(); + wb.seek(0, SEEK_SET); + wb.truncate(0); DB::writeIntText(res, wb); DB::writeChar('\n', wb); wb.sync(); @@ -169,8 +169,8 @@ public: if (broken) { DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); - wb.seek(0); - wb.truncate(); + wb.seek(0, SEEK_SET); + wb.truncate(0); DB::writeIntText(value, wb); DB::writeChar('\n', wb); wb.sync(); diff --git a/dbms/src/Disks/DiskLocal.cpp b/dbms/src/Disks/DiskLocal.cpp index 74300cf61b5..9ffe525637d 100644 --- a/dbms/src/Disks/DiskLocal.cpp +++ b/dbms/src/Disks/DiskLocal.cpp @@ -5,6 +5,9 @@ #include #include +#include +#include + namespace DB { @@ -46,8 +49,10 @@ private: class DiskLocalDirectoryIterator : public IDiskDirectoryIterator { public: - explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) : - dir_path(dir_path_), iter(disk_path_ + dir_path_) {} + explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) + : dir_path(dir_path_), iter(disk_path_ + dir_path_) + { + } void next() override { ++iter; } @@ -200,15 +205,17 @@ void DiskLocal::copyFile(const String & from_path, const String & to_path) Poco::File(disk_path + from_path).copyTo(disk_path + to_path); } -std::unique_ptr DiskLocal::readFile(const String & path, size_t buf_size) const +std::unique_ptr +DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const { - return std::make_unique(disk_path + path, buf_size); + return createReadBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, mmap_threshold, buf_size); } -std::unique_ptr DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode) +std::unique_ptr +DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t aio_threshold) { int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; - return std::make_unique(disk_path + path, buf_size, flags); + return createWriteBufferFromFileBase(disk_path + path, estimated_size, aio_threshold, buf_size, flags); } void DiskLocal::remove(const String & path) diff --git a/dbms/src/Disks/DiskLocal.h b/dbms/src/Disks/DiskLocal.h index 8d9d51fc384..abccd3db232 100644 --- a/dbms/src/Disks/DiskLocal.h +++ b/dbms/src/Disks/DiskLocal.h @@ -1,8 +1,8 @@ #pragma once #include -#include #include +#include #include #include @@ -67,9 +67,19 @@ public: void copyFile(const String & from_path, const String & to_path) override; - std::unique_ptr readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override; + std::unique_ptr readFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t estimated_size = 0, + size_t aio_threshold = 0, + size_t mmap_threshold = 0) const override; - std::unique_ptr writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override; + std::unique_ptr writeFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + WriteMode mode = WriteMode::Rewrite, + size_t estimated_size = 0, + size_t aio_threshold = 0) override; void remove(const String & path) override; diff --git a/dbms/src/Disks/DiskMemory.cpp b/dbms/src/Disks/DiskMemory.cpp index ca79cab79af..ba487535464 100644 --- a/dbms/src/Disks/DiskMemory.cpp +++ b/dbms/src/Disks/DiskMemory.cpp @@ -1,8 +1,9 @@ #include "DiskMemory.h" #include "DiskFactory.h" +#include #include -#include +#include #include #include @@ -39,87 +40,100 @@ private: std::vector::iterator iter; }; -ReadIndirectBuffer::ReadIndirectBuffer(String path_, const String & data_) - : ReadBufferFromFileBase(), buf(ReadBufferFromString(data_)), path(std::move(path_)) +/// Adapter with actual behaviour as ReadBufferFromString. +class ReadIndirectBuffer : public ReadBufferFromFileBase { - internal_buffer = buf.buffer(); - working_buffer = internal_buffer; - pos = working_buffer.begin(); -} - -off_t ReadIndirectBuffer::seek(off_t offset, int whence) -{ - if (whence == SEEK_SET) +public: + ReadIndirectBuffer(String path_, const String & data_) + : ReadBufferFromFileBase(), impl(ReadBufferFromString(data_)), path(std::move(path_)) { - if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end()) + internal_buffer = impl.buffer(); + working_buffer = internal_buffer; + pos = working_buffer.begin(); + } + + std::string getFileName() const override { return path; } + + off_t seek(off_t off, int whence) override + { + impl.swap(*this); + off_t result = impl.seek(off, whence); + impl.swap(*this); + return result; + } + + off_t getPosition() override { return pos - working_buffer.begin(); } + +private: + ReadBufferFromString impl; + const String path; +}; + +/// This class is responsible to update files metadata after buffer is finalized. +class WriteIndirectBuffer : public WriteBufferFromFileBase +{ +public: + WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_, size_t buf_size) + : WriteBufferFromFileBase(buf_size, nullptr, 0), impl(), disk(disk_), path(std::move(path_)), mode(mode_) + { + } + + ~WriteIndirectBuffer() override + { + try { - pos = working_buffer.begin() + offset; - return size_t(pos - working_buffer.begin()); + finalize(); } - else - throw Exception( - "Seek position is out of bounds. " - "Offset: " - + std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())), - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - } - else if (whence == SEEK_CUR) - { - Position new_pos = pos + offset; - if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end()) + catch (...) { - pos = new_pos; - return size_t(pos - working_buffer.begin()); + tryLogCurrentException(__PRETTY_FUNCTION__); } - else - throw Exception( - "Seek position is out of bounds. " - "Offset: " - + std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())), - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); } - else - throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); -} -off_t ReadIndirectBuffer::getPosition() -{ - return pos - working_buffer.begin(); -} - -void WriteIndirectBuffer::finalize() -{ - if (isFinished()) - return; - - next(); - WriteBufferFromVector::finalize(); - - auto iter = disk->files.find(path); - - if (iter == disk->files.end()) - throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST); - - // Resize to the actual number of bytes written to string. - value.resize(count()); - - if (mode == WriteMode::Rewrite) - disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value}); - else if (mode == WriteMode::Append) - disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value}); -} - -WriteIndirectBuffer::~WriteIndirectBuffer() -{ - try + void finalize() override { - finalize(); + if (impl.isFinished()) + return; + + next(); + + /// str() finalizes buffer. + String value = impl.str(); + + auto iter = disk->files.find(path); + + if (iter == disk->files.end()) + throw Exception("File '" + path + "' does not exist", ErrorCodes::FILE_DOESNT_EXIST); + + /// Resize to the actual number of bytes written to string. + value.resize(count()); + + if (mode == WriteMode::Rewrite) + disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, value}); + else if (mode == WriteMode::Append) + disk->files.insert_or_assign(path, DiskMemory::FileData{iter->second.type, iter->second.data + value}); } - catch (...) + + std::string getFileName() const override { return path; } + + void sync() override {} + +private: + void nextImpl() override { - tryLogCurrentException(__PRETTY_FUNCTION__); + if (!offset()) + return; + + impl.write(working_buffer.begin(), offset()); } -} + +private: + WriteBufferFromOwnString impl; + DiskMemory * disk; + const String path; + const WriteMode mode; +}; + ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/) { @@ -302,7 +316,7 @@ void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); } -std::unique_ptr DiskMemory::readFile(const String & path, size_t /*buf_size*/) const +std::unique_ptr DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t) const { std::lock_guard lock(mutex); @@ -313,7 +327,7 @@ std::unique_ptr DiskMemory::readFile(const String & path return std::make_unique(path, iter->second.data); } -std::unique_ptr DiskMemory::writeFile(const String & path, size_t /*buf_size*/, WriteMode mode) +std::unique_ptr DiskMemory::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t) { std::lock_guard lock(mutex); @@ -328,7 +342,7 @@ std::unique_ptr DiskMemory::writeFile(const String & path, size_t / files.emplace(path, FileData{FileType::File}); } - return std::make_unique(this, path, mode); + return std::make_unique(this, path, mode, buf_size); } void DiskMemory::remove(const String & path) diff --git a/dbms/src/Disks/DiskMemory.h b/dbms/src/Disks/DiskMemory.h index 2dd93e58549..82a96bba5c8 100644 --- a/dbms/src/Disks/DiskMemory.h +++ b/dbms/src/Disks/DiskMemory.h @@ -5,46 +5,13 @@ #include #include #include -#include -#include -#include namespace DB { class DiskMemory; -class ReadBuffer; -class WriteBuffer; +class ReadBufferFromFileBase; +class WriteBufferFromFileBase; -/// Adapter with actual behaviour as ReadBufferFromString. -class ReadIndirectBuffer : public ReadBufferFromFileBase -{ -public: - ReadIndirectBuffer(String path_, const String & data_); - - std::string getFileName() const override { return path; } - off_t seek(off_t off, int whence) override; - off_t getPosition() override; - -private: - ReadBufferFromString buf; - String path; -}; - -/// This class is responsible to update files metadata after buffer is finalized. -class WriteIndirectBuffer : public WriteBufferFromOwnString -{ -public: - WriteIndirectBuffer(DiskMemory * disk_, String path_, WriteMode mode_) : disk(disk_), path(std::move(path_)), mode(mode_) {} - - ~WriteIndirectBuffer() override; - - void finalize() override; - -private: - DiskMemory * disk; - String path; - WriteMode mode; -}; /** Implementation of Disk intended only for testing purposes. * All filesystem objects are stored in memory and lost on server restart. @@ -93,10 +60,19 @@ public: void copyFile(const String & from_path, const String & to_path) override; - std::unique_ptr readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override; + std::unique_ptr readFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t estimated_size = 0, + size_t aio_threshold = 0, + size_t mmap_threshold = 0) const override; - std::unique_ptr - writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) override; + std::unique_ptr writeFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + WriteMode mode = WriteMode::Rewrite, + size_t estimated_size = 0, + size_t aio_threshold = 0) override; void remove(const String & path) override; diff --git a/dbms/src/Disks/DiskS3.cpp b/dbms/src/Disks/DiskS3.cpp index 7142b6a3643..4884f0fdf4a 100644 --- a/dbms/src/Disks/DiskS3.cpp +++ b/dbms/src/Disks/DiskS3.cpp @@ -52,25 +52,28 @@ namespace */ struct Metadata { - // Metadata file version. + /// Metadata file version. const UInt32 VERSION = 1; using PathAndSize = std::pair; - // Path to metadata file on local FS. + /// Path to metadata file on local FS. String metadata_file_path; - // S3 object references count. + /// S3 object references count. UInt32 s3_objects_count; - // Total size of all S3 objects. + /// Total size of all S3 objects. size_t total_size; - // S3 objects paths and their sizes. + /// S3 objects paths and their sizes. std::vector s3_objects; explicit Metadata(const Poco::File & file) : Metadata(file.path(), false) {} - // Load metadata by path or create empty if `create` flag is set. + /// Load metadata by path or create empty if `create` flag is set. explicit Metadata(const String & file_path, bool create = false) - : metadata_file_path(file_path), s3_objects_count(0), total_size(0), s3_objects(0) + : metadata_file_path(file_path) + , s3_objects_count(0) + , total_size(0) + , s3_objects(0) { if (create) return; @@ -112,10 +115,11 @@ namespace s3_objects.emplace_back(path, size); } - void save() + /// Fsync metadata file if 'sync' flag is set. + void save(bool sync = false) { WriteBufferFromFile buf(metadata_file_path, 1024); - + writeIntText(VERSION, buf); writeChar('\n', buf); @@ -132,11 +136,12 @@ namespace writeChar('\n', buf); } buf.finalize(); + if (sync) + buf.sync(); } }; - // Reads data from S3. - // It supports reading from multiple S3 paths that resides in Metadata. + /// Reads data from S3 using stored paths in metadata. class ReadIndirectBufferFromS3 : public ReadBufferFromFileBase { public: @@ -148,7 +153,6 @@ namespace , metadata(std::move(metadata_)) , buf_size(buf_size_) , absolute_position(0) - , initialized(false) , current_buf_idx(0) , current_buf(nullptr) { @@ -196,19 +200,16 @@ namespace } offset -= size; } - initialized = true; return nullptr; } bool nextImpl() override { - // Find first available buffer that fits to given offset. - if (!initialized) - { + /// Find first available buffer that fits to given offset. + if (!current_buf) current_buf = initialize(); - } - // If current buffer has remaining data - use it. + /// If current buffer has remaining data - use it. if (current_buf && current_buf->next()) { working_buffer = current_buf->buffer(); @@ -216,7 +217,7 @@ namespace return true; } - // If there is no available buffers - nothing to read. + /// If there is no available buffers - nothing to read. if (current_buf_idx + 1 >= metadata.s3_objects_count) return false; @@ -237,13 +238,12 @@ namespace size_t buf_size; size_t absolute_position = 0; - bool initialized; UInt32 current_buf_idx; std::unique_ptr current_buf; }; /// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. - class WriteIndirectBufferFromS3 : public WriteBufferFromS3 + class WriteIndirectBufferFromS3 : public WriteBufferFromFileBase { public: WriteIndirectBufferFromS3( @@ -253,25 +253,15 @@ namespace const String & s3_path_, size_t min_upload_part_size, size_t buf_size_) - : WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_) + : WriteBufferFromFileBase(buf_size_, nullptr, 0) + , impl(WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_)) , metadata(std::move(metadata_)) , s3_path(s3_path_) { } - void finalize() override - { - WriteBufferFromS3::finalize(); - metadata.addObject(s3_path, total_size); - metadata.save(); - finalized = true; - } - ~WriteIndirectBufferFromS3() override { - if (finalized) - return; - try { finalize(); @@ -282,7 +272,38 @@ namespace } } + void finalize() override + { + if (finalized) + return; + + next(); + impl.finalize(); + + metadata.addObject(s3_path, count()); + metadata.save(); + + finalized = true; + } + + void sync() override { metadata.save(true); } + std::string getFileName() const override { return metadata.metadata_file_path; } + private: + void nextImpl() override + { + /// Transfer current working buffer to WriteBufferFromS3. + impl.swap(*this); + + /// Write actual data to S3. + impl.next(); + + /// Return back working buffer. + impl.swap(*this); + } + + private: + WriteBufferFromS3 impl; bool finalized = false; Metadata metadata; String s3_path; @@ -457,7 +478,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) to.save(); } -std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size) const +std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const { Metadata metadata(metadata_path + path); @@ -468,19 +489,19 @@ std::unique_ptr DiskS3::readFile(const String & path, si return std::make_unique(client, bucket, metadata, buf_size); } -std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode) +std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t) { bool exist = exists(path); - // Reference to store new S3 object. + /// Path to store new S3 object. auto s3_path = s3_root_path + getRandomName(); if (!exist || mode == WriteMode::Rewrite) { - // If metadata file exists - remove and create new. + /// If metadata file exists - remove and create new. if (exist) remove(path); Metadata metadata(metadata_path + path, true); - // Save empty metadata to disk to have ability to get file size while buffer is not finalized. + /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. metadata.save(); LOG_DEBUG(&Logger::get("DiskS3"), "Write to file by path: " << backQuote(metadata_path + path) << " New S3 path: " << s3_path); @@ -512,7 +533,7 @@ void DiskS3::remove(const String & path) { auto s3_path = metadata.s3_objects[i].first; - // TODO: Make operation idempotent. Do not throw exception if key is already deleted. + /// TODO: Make operation idempotent. Do not throw exception if key is already deleted. Aws::S3::Model::DeleteObjectRequest request; request.SetBucket(bucket); request.SetKey(s3_path); diff --git a/dbms/src/Disks/DiskS3.h b/dbms/src/Disks/DiskS3.h index 759e2f347d9..85746ddd6bb 100644 --- a/dbms/src/Disks/DiskS3.h +++ b/dbms/src/Disks/DiskS3.h @@ -21,8 +21,13 @@ class DiskS3 : public IDisk public: friend class DiskS3Reservation; - DiskS3(String name_, std::shared_ptr client_, String bucket_, String s3_root_path_, - String metadata_path_, size_t min_upload_part_size_); + DiskS3( + String name_, + std::shared_ptr client_, + String bucket_, + String s3_root_path_, + String metadata_path_, + size_t min_upload_part_size_); const String & getName() const override { return name; } @@ -62,9 +67,19 @@ public: void copyFile(const String & from_path, const String & to_path) override; - std::unique_ptr readFile(const String & path, size_t buf_size) const override; + std::unique_ptr readFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t estimated_size = 0, + size_t aio_threshold = 0, + size_t mmap_threshold = 0) const override; - std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + std::unique_ptr writeFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + WriteMode mode = WriteMode::Rewrite, + size_t estimated_size = 0, + size_t aio_threshold = 0) override; void remove(const String & path) override; diff --git a/dbms/src/Disks/IDisk.h b/dbms/src/Disks/IDisk.h index ff920897ebf..b737c7a53df 100644 --- a/dbms/src/Disks/IDisk.h +++ b/dbms/src/Disks/IDisk.h @@ -26,7 +26,7 @@ class IReservation; using ReservationPtr = std::unique_ptr; class ReadBufferFromFileBase; -class WriteBuffer; +class WriteBufferFromFileBase; /** * Mode of opening a file for write. @@ -121,11 +121,21 @@ public: /// Copy the file from `from_path` to `to_path`. virtual void copyFile(const String & from_path, const String & to_path) = 0; - /// Open the file for read and return SeekableReadBuffer object. - virtual std::unique_ptr readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const = 0; + /// Open the file for read and return ReadBufferFromFileBase object. + virtual std::unique_ptr readFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + size_t estimated_size = 0, + size_t aio_threshold = 0, + size_t mmap_threshold = 0) const = 0; - /// Open the file for write and return WriteBuffer object. - virtual std::unique_ptr writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0; + /// Open the file for write and return WriteBufferFromFileBase object. + virtual std::unique_ptr writeFile( + const String & path, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + WriteMode mode = WriteMode::Rewrite, + size_t estimated_size = 0, + size_t aio_threshold = 0) = 0; /// Remove file or directory. Throws exception if file doesn't exists or if directory is not empty. virtual void remove(const String & path) = 0; diff --git a/dbms/src/IO/WriteBufferAIO.cpp b/dbms/src/IO/WriteBufferAIO.cpp index 532bc6b8078..c542bed16c4 100644 --- a/dbms/src/IO/WriteBufferAIO.cpp +++ b/dbms/src/IO/WriteBufferAIO.cpp @@ -139,7 +139,7 @@ void WriteBufferAIO::nextImpl() is_pending_write = true; } -off_t WriteBufferAIO::doSeek(off_t off, int whence) +off_t WriteBufferAIO::seek(off_t off, int whence) { flush(); @@ -169,7 +169,7 @@ off_t WriteBufferAIO::doSeek(off_t off, int whence) return pos_in_file; } -void WriteBufferAIO::doTruncate(off_t length) +void WriteBufferAIO::truncate(off_t length) { flush(); diff --git a/dbms/src/IO/WriteBufferAIO.h b/dbms/src/IO/WriteBufferAIO.h index d51da73f906..57d035a8ef3 100644 --- a/dbms/src/IO/WriteBufferAIO.h +++ b/dbms/src/IO/WriteBufferAIO.h @@ -34,15 +34,15 @@ public: WriteBufferAIO(const WriteBufferAIO &) = delete; WriteBufferAIO & operator=(const WriteBufferAIO &) = delete; - off_t getPositionInFile() override; + off_t getPositionInFile(); + off_t seek(off_t off, int whence); + void truncate(off_t length); void sync() override; std::string getFileName() const override { return filename; } - int getFD() const override { return fd; } + int getFD() const { return fd; } private: void nextImpl() override; - off_t doSeek(off_t off, int whence) override; - void doTruncate(off_t length) override; /// If there's still data in the buffer, we'll write them. void flush(); diff --git a/dbms/src/IO/WriteBufferFromFileBase.cpp b/dbms/src/IO/WriteBufferFromFileBase.cpp index 4b989d7ac72..2b9cbb88cd8 100644 --- a/dbms/src/IO/WriteBufferFromFileBase.cpp +++ b/dbms/src/IO/WriteBufferFromFileBase.cpp @@ -8,14 +8,4 @@ WriteBufferFromFileBase::WriteBufferFromFileBase(size_t buf_size, char * existin { } -off_t WriteBufferFromFileBase::seek(off_t off, int whence) -{ - return doSeek(off, whence); -} - -void WriteBufferFromFileBase::truncate(off_t length) -{ - return doTruncate(length); -} - } diff --git a/dbms/src/IO/WriteBufferFromFileBase.h b/dbms/src/IO/WriteBufferFromFileBase.h index e846045016c..d35b69a7df7 100644 --- a/dbms/src/IO/WriteBufferFromFileBase.h +++ b/dbms/src/IO/WriteBufferFromFileBase.h @@ -15,16 +15,8 @@ public: WriteBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment); ~WriteBufferFromFileBase() override = default; - off_t seek(off_t off, int whence = SEEK_SET); - void truncate(off_t length = 0); - virtual off_t getPositionInFile() = 0; void sync() override = 0; virtual std::string getFileName() const = 0; - virtual int getFD() const = 0; - -protected: - virtual off_t doSeek(off_t off, int whence) = 0; - virtual void doTruncate(off_t length) = 0; }; } diff --git a/dbms/src/IO/WriteBufferFromFileDescriptor.cpp b/dbms/src/IO/WriteBufferFromFileDescriptor.cpp index 2891c1b7076..fe528bdf810 100644 --- a/dbms/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/WriteBufferFromFileDescriptor.cpp @@ -98,12 +98,6 @@ WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor() } -off_t WriteBufferFromFileDescriptor::getPositionInFile() -{ - return seek(0, SEEK_CUR); -} - - void WriteBufferFromFileDescriptor::sync() { /// If buffer has pending data - write it. @@ -116,7 +110,7 @@ void WriteBufferFromFileDescriptor::sync() } -off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence) +off_t WriteBufferFromFileDescriptor::seek(off_t offset, int whence) { off_t res = lseek(fd, offset, whence); if (-1 == res) @@ -126,7 +120,7 @@ off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence) } -void WriteBufferFromFileDescriptor::doTruncate(off_t length) +void WriteBufferFromFileDescriptor::truncate(off_t length) { int res = ftruncate(fd, length); if (-1 == res) diff --git a/dbms/src/IO/WriteBufferFromFileDescriptor.h b/dbms/src/IO/WriteBufferFromFileDescriptor.h index da608b9739a..d7df04ee940 100644 --- a/dbms/src/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/src/IO/WriteBufferFromFileDescriptor.h @@ -35,19 +35,15 @@ public: ~WriteBufferFromFileDescriptor() override; - int getFD() const override + int getFD() const { return fd; } - off_t getPositionInFile() override; - void sync() override; -private: - off_t doSeek(off_t offset, int whence) override; - - void doTruncate(off_t length) override; + off_t seek(off_t offset, int whence); + void truncate(off_t length); }; } diff --git a/dbms/src/IO/WriteBufferFromS3.cpp b/dbms/src/IO/WriteBufferFromS3.cpp index b3e2568b55e..38456627db2 100644 --- a/dbms/src/IO/WriteBufferFromS3.cpp +++ b/dbms/src/IO/WriteBufferFromS3.cpp @@ -136,7 +136,6 @@ void WriteBufferFromS3::writePart(const String & data) { auto etag = outcome.GetResult().GetETag(); part_tags.push_back(etag); - total_size += data.size(); LOG_DEBUG( log, "Writing part finished. " diff --git a/dbms/src/IO/WriteBufferFromS3.h b/dbms/src/IO/WriteBufferFromS3.h index 8d575a4fd5e..f928941e482 100644 --- a/dbms/src/IO/WriteBufferFromS3.h +++ b/dbms/src/IO/WriteBufferFromS3.h @@ -38,10 +38,6 @@ private: Logger * log = &Logger::get("WriteBufferFromS3"); -protected: - // Total size of all uploaded parts. - size_t total_size = 0; - public: explicit WriteBufferFromS3( std::shared_ptr client_ptr_, diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 84c9a3c71d3..98972f84ba1 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 84e625c89cc..17a851309fe 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index f3d7750fd10..043ece086e9 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include