diff --git a/src/Backups/ArchiveBackup.cpp b/src/Backups/ArchiveBackup.cpp index 0c4b0c3cd40..69194622e5a 100644 --- a/src/Backups/ArchiveBackup.cpp +++ b/src/Backups/ArchiveBackup.cpp @@ -86,7 +86,7 @@ void ArchiveBackup::closeImpl(const Strings &, bool writing_finalized_) fs::remove(path); } -std::unique_ptr ArchiveBackup::readFileImpl(const String & file_name) const +std::unique_ptr ArchiveBackup::readFileImpl(const String & file_name) const { /// mutex is already locked return reader->readFile(file_name); diff --git a/src/Backups/ArchiveBackup.h b/src/Backups/ArchiveBackup.h index d947fa16beb..4e935efbddc 100644 --- a/src/Backups/ArchiveBackup.h +++ b/src/Backups/ArchiveBackup.h @@ -37,7 +37,7 @@ private: void openImpl(OpenMode open_mode_) override; void closeImpl(const Strings & written_files_, bool writing_finalized_) override; bool supportsWritingInMultipleThreads() const override { return false; } - std::unique_ptr readFileImpl(const String & file_name) const override; + std::unique_ptr readFileImpl(const String & file_name) const override; std::unique_ptr writeFileImpl(const String & file_name) override; const DiskPtr disk; diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.cpp b/src/Backups/BackupEntryFromAppendOnlyFile.cpp index d7f9d5624c8..fa816091bdf 100644 --- a/src/Backups/BackupEntryFromAppendOnlyFile.cpp +++ b/src/Backups/BackupEntryFromAppendOnlyFile.cpp @@ -1,5 +1,5 @@ #include -#include +#include namespace DB @@ -26,10 +26,10 @@ BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile( { } -std::unique_ptr BackupEntryFromAppendOnlyFile::getReadBuffer() const +std::unique_ptr BackupEntryFromAppendOnlyFile::getReadBuffer() const { auto buf = BackupEntryFromImmutableFile::getReadBuffer(); - return std::make_unique(std::move(buf), limit, false); + return std::make_unique(std::move(buf), limit); } } diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.h b/src/Backups/BackupEntryFromAppendOnlyFile.h index c1de6930483..d868f82d45f 100644 --- a/src/Backups/BackupEntryFromAppendOnlyFile.h +++ b/src/Backups/BackupEntryFromAppendOnlyFile.h @@ -26,7 +26,7 @@ public: const std::shared_ptr & temporary_file_ = {}); UInt64 getSize() const override { return limit; } - std::unique_ptr getReadBuffer() const override; + std::unique_ptr getReadBuffer() const override; private: const UInt64 limit; diff --git a/src/Backups/BackupEntryFromImmutableFile.cpp b/src/Backups/BackupEntryFromImmutableFile.cpp index 4323682950d..088324f364a 100644 --- a/src/Backups/BackupEntryFromImmutableFile.cpp +++ b/src/Backups/BackupEntryFromImmutableFile.cpp @@ -36,7 +36,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const return *file_size; } -std::unique_ptr BackupEntryFromImmutableFile::getReadBuffer() const +std::unique_ptr BackupEntryFromImmutableFile::getReadBuffer() const { if (disk) return disk->readFile(file_path); diff --git a/src/Backups/BackupEntryFromImmutableFile.h b/src/Backups/BackupEntryFromImmutableFile.h index bc1d08aa180..4d5f47b4f61 100644 --- a/src/Backups/BackupEntryFromImmutableFile.h +++ b/src/Backups/BackupEntryFromImmutableFile.h @@ -33,7 +33,7 @@ public: UInt64 getSize() const override; std::optional getChecksum() const override { return checksum; } - std::unique_ptr getReadBuffer() const override; + std::unique_ptr getReadBuffer() const override; String getFilePath() const { return file_path; } DiskPtr getDisk() const { return disk; } diff --git a/src/Backups/BackupEntryFromMemory.cpp b/src/Backups/BackupEntryFromMemory.cpp index 96493e7962e..f59eadc2d7f 100644 --- a/src/Backups/BackupEntryFromMemory.cpp +++ b/src/Backups/BackupEntryFromMemory.cpp @@ -15,7 +15,7 @@ BackupEntryFromMemory::BackupEntryFromMemory(String data_, const std::optional BackupEntryFromMemory::getReadBuffer() const +std::unique_ptr BackupEntryFromMemory::getReadBuffer() const { return std::make_unique(data); } diff --git a/src/Backups/BackupEntryFromMemory.h b/src/Backups/BackupEntryFromMemory.h index d497ff1b439..2226112c9c3 100644 --- a/src/Backups/BackupEntryFromMemory.h +++ b/src/Backups/BackupEntryFromMemory.h @@ -17,7 +17,7 @@ public: UInt64 getSize() const override { return data.size(); } std::optional getChecksum() const override { return checksum; } - std::unique_ptr getReadBuffer() const override; + std::unique_ptr getReadBuffer() const override; private: const String data; diff --git a/src/Backups/BackupImpl.cpp b/src/Backups/BackupImpl.cpp index f1bb8830a68..7b8c6418367 100644 --- a/src/Backups/BackupImpl.cpp +++ b/src/Backups/BackupImpl.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include @@ -63,13 +63,14 @@ public: { } - std::unique_ptr getReadBuffer() const override + std::unique_ptr getReadBuffer() const override { auto read_buffer = backup->readFileImpl(data_file_name); if (base_backup_entry) { - auto base_backup_read_buffer = base_backup_entry->getReadBuffer(); - read_buffer = std::make_unique(std::move(base_backup_read_buffer), std::move(read_buffer)); + size_t base_size = base_backup_entry->getSize(); + read_buffer = std::make_unique( + base_backup_entry->getReadBuffer(), base_size, std::move(read_buffer), size - base_size); } return read_buffer; } @@ -522,7 +523,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry) base_checksum = base_backup->getFileChecksum(file_name); } - std::unique_ptr read_buffer; /// We'll set that later. + std::unique_ptr read_buffer; /// We'll set that later. std::optional hashing_read_buffer; UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`. @@ -608,16 +609,9 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry) auto copy_pos = use_base ? base_size : 0; /// Move the current read position to the start position to copy data. - /// If `read_buffer` is seekable it's easier, otherwise we can use ignore(). - if (auto * seekable_buffer = dynamic_cast(read_buffer.get())) - { - seekable_buffer->seek(copy_pos, SEEK_SET); - } - else - { + if (!read_buffer) read_buffer = entry->getReadBuffer(); - read_buffer->ignore(copy_pos); - } + read_buffer->seek(copy_pos, SEEK_SET); /// Copy the entry's data after `copy_pos`. auto out = writeFileImpl(getHexUIntLowercase(*checksum)); diff --git a/src/Backups/BackupImpl.h b/src/Backups/BackupImpl.h index 3fba3762b35..38fd7e7e7d3 100644 --- a/src/Backups/BackupImpl.h +++ b/src/Backups/BackupImpl.h @@ -11,6 +11,7 @@ namespace DB { class Context; using ContextPtr = std::shared_ptr; +class SeekableReadBuffer; /// Base implementation of IBackup. /// Along with passed files it also stores backup metadata - a single file named ".backup" in XML format @@ -53,7 +54,7 @@ protected: /// Read a file from the backup. /// Low level: the function doesn't check base backup or checksums. - virtual std::unique_ptr readFileImpl(const String & file_name) const = 0; + virtual std::unique_ptr readFileImpl(const String & file_name) const = 0; /// Add a file to the backup. /// Low level: the function doesn't check base backup or checksums. diff --git a/src/Backups/DirectoryBackup.cpp b/src/Backups/DirectoryBackup.cpp index 0deb41c200d..6a60cbdd1ef 100644 --- a/src/Backups/DirectoryBackup.cpp +++ b/src/Backups/DirectoryBackup.cpp @@ -56,7 +56,7 @@ void DirectoryBackup::closeImpl(const Strings & written_files_, bool writing_fin } } -std::unique_ptr DirectoryBackup::readFileImpl(const String & file_name) const +std::unique_ptr DirectoryBackup::readFileImpl(const String & file_name) const { auto file_path = path / file_name; return disk->readFile(file_path); diff --git a/src/Backups/DirectoryBackup.h b/src/Backups/DirectoryBackup.h index 499a1893dca..d9dbc81fa78 100644 --- a/src/Backups/DirectoryBackup.h +++ b/src/Backups/DirectoryBackup.h @@ -27,7 +27,7 @@ private: bool backupExists() const override; void openImpl(OpenMode open_mode_) override; void closeImpl(const Strings & written_files_, bool writing_finalized_) override; - std::unique_ptr readFileImpl(const String & file_name) const override; + std::unique_ptr readFileImpl(const String & file_name) const override; std::unique_ptr writeFileImpl(const String & file_name) override; DiskPtr disk; diff --git a/src/Backups/IBackupEntriesBatch.cpp b/src/Backups/IBackupEntriesBatch.cpp index bf6bc6cce83..34a91668023 100644 --- a/src/Backups/IBackupEntriesBatch.cpp +++ b/src/Backups/IBackupEntriesBatch.cpp @@ -1,5 +1,5 @@ #include -#include +#include namespace DB @@ -15,7 +15,7 @@ public: UInt64 getSize() const override { return batch->getSize(index); } std::optional getChecksum() const override { return batch->getChecksum(index); } - std::unique_ptr getReadBuffer() const override { return batch->getReadBuffer(index); } + std::unique_ptr getReadBuffer() const override { return batch->getReadBuffer(index); } private: const std::shared_ptr batch; diff --git a/src/Backups/IBackupEntriesBatch.h b/src/Backups/IBackupEntriesBatch.h index 0d8c8d5aa26..7fceb793c00 100644 --- a/src/Backups/IBackupEntriesBatch.h +++ b/src/Backups/IBackupEntriesBatch.h @@ -17,7 +17,7 @@ public: protected: IBackupEntriesBatch(const Strings & entry_names_) : entry_names(entry_names_) {} - virtual std::unique_ptr getReadBuffer(size_t index) = 0; + virtual std::unique_ptr getReadBuffer(size_t index) = 0; virtual UInt64 getSize(size_t index) = 0; virtual std::optional getChecksum(size_t) { return {}; } diff --git a/src/Backups/IBackupEntry.h b/src/Backups/IBackupEntry.h index 719e03ae6f5..55f03f1a710 100644 --- a/src/Backups/IBackupEntry.h +++ b/src/Backups/IBackupEntry.h @@ -7,7 +7,7 @@ namespace DB { -class ReadBuffer; +class SeekableReadBuffer; /// A backup entry represents some data which should be written to the backup or has been read from the backup. class IBackupEntry @@ -23,7 +23,7 @@ public: virtual std::optional getChecksum() const { return {}; } /// Returns a read buffer for reading the data. - virtual std::unique_ptr getReadBuffer() const = 0; + virtual std::unique_ptr getReadBuffer() const = 0; }; using BackupEntryPtr = std::unique_ptr; diff --git a/src/IO/ConcatSeekableReadBuffer.cpp b/src/IO/ConcatSeekableReadBuffer.cpp new file mode 100644 index 00000000000..c5d48376e2f --- /dev/null +++ b/src/IO/ConcatSeekableReadBuffer.cpp @@ -0,0 +1,139 @@ +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ARGUMENT_OUT_OF_BOUND; +} + +ConcatSeekableReadBuffer::BufferInfo::~BufferInfo() +{ + if (own_in) + delete in; +} + +ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(std::unique_ptr buf1, size_t size1, std::unique_ptr buf2, size_t size2) : ConcatSeekableReadBuffer() +{ + appendBuffer(std::move(buf1), size1); + appendBuffer(std::move(buf2), size2); +} + +ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(SeekableReadBuffer & buf1, size_t size1, SeekableReadBuffer & buf2, size_t size2) : ConcatSeekableReadBuffer() +{ + appendBuffer(buf1, size1); + appendBuffer(buf2, size2); +} + +void ConcatSeekableReadBuffer::appendBuffer(std::unique_ptr buffer, size_t size) +{ + appendBuffer(buffer.release(), true, size); +} + +void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer & buffer, size_t size) +{ + appendBuffer(&buffer, false, size); +} + +void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer * buffer, bool own, size_t size) +{ + BufferInfo info; + info.in = buffer; + info.own_in = own; + info.size = size; + + if (!size) + return; + + buffers.emplace_back(std::move(info)); + total_size += size; + + if (current == buffers.size() - 1) + { + working_buffer = buffers[current].in->buffer(); + pos = buffers[current].in->position(); + } +} + +bool ConcatSeekableReadBuffer::nextImpl() +{ + if (current < buffers.size()) + { + buffers[current].in->position() = pos; + while ((current < buffers.size()) && buffers[current].in->eof()) + { + current_start_pos += buffers[current++].size; + if (current < buffers.size()) + buffers[current].in->seek(0, SEEK_SET); + } + } + + if (current >= buffers.size()) + { + current_start_pos = total_size; + set(nullptr, 0); + return false; + } + + working_buffer = buffers[current].in->buffer(); + pos = buffers[current].in->position(); + return true; +} + +off_t ConcatSeekableReadBuffer::getPosition() +{ + size_t current_pos = current_start_pos; + if (current < buffers.size()) + current_pos += buffers[current].in->getPosition() + offset(); + return current_pos; +} + +off_t ConcatSeekableReadBuffer::seek(off_t off, int whence) +{ + off_t new_position; + off_t current_position = getPosition(); + if (whence == SEEK_SET) + new_position = off; + else if (whence == SEEK_CUR) + new_position = current_position + off; + else + throw Exception("ConcatSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + if (new_position < 0) + throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND); + if (static_cast(new_position) > total_size) + throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + if (static_cast(new_position) == total_size) + { + current = buffers.size(); + current_start_pos = total_size; + set(nullptr, 0); + return new_position; + } + + off_t change_position = new_position - current_position; + if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end())) + { + /// Position is still inside the same working buffer. + pos += change_position; + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + return new_position; + } + + while (new_position < static_cast(current_start_pos)) + current_start_pos -= buffers[--current].size; + + while (new_position >= static_cast(current_start_pos + buffers[current].size)) + current_start_pos += buffers[current++].size; + + buffers[current].in->seek(new_position - current_start_pos, SEEK_SET); + working_buffer = buffers[current].in->buffer(); + pos = buffers[current].in->position(); + return new_position; +} + +} diff --git a/src/IO/ConcatSeekableReadBuffer.h b/src/IO/ConcatSeekableReadBuffer.h new file mode 100644 index 00000000000..26314a218ea --- /dev/null +++ b/src/IO/ConcatSeekableReadBuffer.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/// Reads from the concatenation of multiple SeekableReadBuffer's +class ConcatSeekableReadBuffer : public SeekableReadBufferWithSize +{ +public: + ConcatSeekableReadBuffer() : SeekableReadBufferWithSize(nullptr, 0) { } + ConcatSeekableReadBuffer(std::unique_ptr buf1, size_t size1, std::unique_ptr buf2, size_t size2); + ConcatSeekableReadBuffer(SeekableReadBuffer & buf1, size_t size1, SeekableReadBuffer & buf2, size_t size2); + + void appendBuffer(std::unique_ptr buffer, size_t size); + void appendBuffer(SeekableReadBuffer & buffer, size_t size); + + off_t seek(off_t off, int whence) override; + off_t getPosition() override; + + std::optional getTotalSize() override { return total_size; } + +private: + bool nextImpl() override; + void appendBuffer(SeekableReadBuffer * buffer, bool own, size_t size); + + struct BufferInfo + { + BufferInfo() = default; + BufferInfo(BufferInfo &&) = default; + ~BufferInfo(); + SeekableReadBuffer * in = nullptr; + bool own_in = false; + size_t size = 0; + }; + + std::vector buffers; + size_t total_size = 0; + size_t current = 0; + size_t current_start_pos = 0; /// Position of the current buffer's begin. +}; + +} diff --git a/src/IO/LimitSeekableReadBuffer.cpp b/src/IO/LimitSeekableReadBuffer.cpp new file mode 100644 index 00000000000..fc3300e71ca --- /dev/null +++ b/src/IO/LimitSeekableReadBuffer.cpp @@ -0,0 +1,131 @@ +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ARGUMENT_OUT_OF_BOUND; + extern const int LIMIT_EXCEEDED; +} + +bool LimitSeekableReadBuffer::nextImpl() +{ + if (end_position >= static_cast(limit)) + { + /// Limit reached. + set(in->position(), 0); + return false; + } + + assert(position() >= in->position()); + in->position() = position(); + + if (!in->next()) + { + /// EOF reached. + set(in->position(), 0); + return false; + } + + working_buffer = in->buffer(); + pos = in->position(); + end_position = in->getPosition() + in->available(); + + if (end_position > static_cast(limit)) + { + working_buffer.resize(working_buffer.size() - end_position + limit); + end_position = limit; + } + + return true; +} + + +off_t LimitSeekableReadBuffer::seek(off_t off, int whence) +{ + off_t new_position; + off_t current_position = getPosition(); + if (whence == SEEK_SET) + new_position = off; + else if (whence == SEEK_CUR) + new_position = current_position + off; + else + throw Exception("LimitSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + if (new_position < 0) + throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND); + if (static_cast(new_position) > limit) + throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + off_t change_position = new_position - current_position; + if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end())) + { + /// Position is still inside buffer. + pos += change_position; + assert(pos >= working_buffer.begin()); + assert(pos <= working_buffer.end()); + return new_position; + } + + in->seek(new_position, SEEK_SET); + working_buffer = in->buffer(); + pos = in->position(); + end_position = in->getPosition() + in->available(); + + if (end_position > static_cast(limit)) + { + working_buffer.resize(working_buffer.size() - end_position + limit); + end_position = limit; + } + + return new_position; +} + + +LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_) + : SeekableReadBuffer(in_ ? in_->position() : nullptr, 0) + , in(in_) + , owns_in(owns) + , limit(limit_) +{ + assert(in); + + off_t current_position = in->getPosition(); + if (current_position > static_cast(limit)) + throw Exception("Limit for LimitSeekableReadBuffer exceeded", ErrorCodes::LIMIT_EXCEEDED); + + working_buffer = in->buffer(); + pos = in->position(); + end_position = current_position + in->available(); + + if (end_position > static_cast(limit)) + { + working_buffer.resize(working_buffer.size() - end_position + limit); + end_position = limit; + } +} + + +LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_) + : LimitSeekableReadBuffer(&in_, false, limit_) +{ +} + + +LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr in_, UInt64 limit_) + : LimitSeekableReadBuffer(in_.release(), true, limit_) +{ +} + + +LimitSeekableReadBuffer::~LimitSeekableReadBuffer() +{ + /// Update underlying buffer's position in case when limit wasn't reached. + in->position() = position(); + if (owns_in) + delete in; +} + +} diff --git a/src/IO/LimitSeekableReadBuffer.h b/src/IO/LimitSeekableReadBuffer.h new file mode 100644 index 00000000000..c6399f574c1 --- /dev/null +++ b/src/IO/LimitSeekableReadBuffer.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/** Allows to read from another SeekableReadBuffer no far than the specified offset. + * Note that the nested SeekableReadBuffer may read slightly more data internally to fill its buffer. + */ +class LimitSeekableReadBuffer : public SeekableReadBuffer +{ +public: + LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_); + LimitSeekableReadBuffer(std::unique_ptr in_, UInt64 limit_); + ~LimitSeekableReadBuffer() override; + + off_t seek(off_t off, int whence) override; + off_t getPosition() override { return end_position - available(); } + +private: + SeekableReadBuffer * in; + bool owns_in; + UInt64 limit; + off_t end_position; /// Offset of the end of working_buffer. + + LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_); + bool nextImpl() override; +}; + +} diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 3e2fe996fe8..26b435f98a0 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -448,7 +448,7 @@ private: }); } - std::unique_ptr getReadBuffer(size_t index) override + std::unique_ptr getReadBuffer(size_t index) override { initialize(); return createReadBufferFromFileBase(file_paths[index], {});