mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Use SeekableReadBuffer instead of ReadBuffer in IBackupEntry.
This commit is contained in:
parent
f14613f433
commit
3966ee1e30
@ -86,7 +86,7 @@ void ArchiveBackup::closeImpl(const Strings &, bool writing_finalized_)
|
|||||||
fs::remove(path);
|
fs::remove(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> ArchiveBackup::readFileImpl(const String & file_name) const
|
std::unique_ptr<SeekableReadBuffer> ArchiveBackup::readFileImpl(const String & file_name) const
|
||||||
{
|
{
|
||||||
/// mutex is already locked
|
/// mutex is already locked
|
||||||
return reader->readFile(file_name);
|
return reader->readFile(file_name);
|
||||||
|
@ -37,7 +37,7 @@ private:
|
|||||||
void openImpl(OpenMode open_mode_) override;
|
void openImpl(OpenMode open_mode_) override;
|
||||||
void closeImpl(const Strings & written_files_, bool writing_finalized_) override;
|
void closeImpl(const Strings & written_files_, bool writing_finalized_) override;
|
||||||
bool supportsWritingInMultipleThreads() const override { return false; }
|
bool supportsWritingInMultipleThreads() const override { return false; }
|
||||||
std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const override;
|
std::unique_ptr<SeekableReadBuffer> readFileImpl(const String & file_name) const override;
|
||||||
std::unique_ptr<WriteBuffer> writeFileImpl(const String & file_name) override;
|
std::unique_ptr<WriteBuffer> writeFileImpl(const String & file_name) override;
|
||||||
|
|
||||||
const DiskPtr disk;
|
const DiskPtr disk;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#include <Backups/BackupEntryFromAppendOnlyFile.h>
|
#include <Backups/BackupEntryFromAppendOnlyFile.h>
|
||||||
#include <IO/LimitReadBuffer.h>
|
#include <IO/LimitSeekableReadBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -26,10 +26,10 @@ BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
|
std::unique_ptr<SeekableReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
|
||||||
{
|
{
|
||||||
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
|
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
|
||||||
return std::make_unique<LimitReadBuffer>(std::move(buf), limit, false);
|
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ public:
|
|||||||
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
|
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
|
||||||
|
|
||||||
UInt64 getSize() const override { return limit; }
|
UInt64 getSize() const override { return limit; }
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const UInt64 limit;
|
const UInt64 limit;
|
||||||
|
@ -36,7 +36,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
|
|||||||
return *file_size;
|
return *file_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
|
std::unique_ptr<SeekableReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
|
||||||
{
|
{
|
||||||
if (disk)
|
if (disk)
|
||||||
return disk->readFile(file_path);
|
return disk->readFile(file_path);
|
||||||
|
@ -33,7 +33,7 @@ public:
|
|||||||
|
|
||||||
UInt64 getSize() const override;
|
UInt64 getSize() const override;
|
||||||
std::optional<UInt128> getChecksum() const override { return checksum; }
|
std::optional<UInt128> getChecksum() const override { return checksum; }
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
|
||||||
|
|
||||||
String getFilePath() const { return file_path; }
|
String getFilePath() const { return file_path; }
|
||||||
DiskPtr getDisk() const { return disk; }
|
DiskPtr getDisk() const { return disk; }
|
||||||
|
@ -15,7 +15,7 @@ BackupEntryFromMemory::BackupEntryFromMemory(String data_, const std::optional<U
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> BackupEntryFromMemory::getReadBuffer() const
|
std::unique_ptr<SeekableReadBuffer> BackupEntryFromMemory::getReadBuffer() const
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromString>(data);
|
return std::make_unique<ReadBufferFromString>(data);
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ public:
|
|||||||
|
|
||||||
UInt64 getSize() const override { return data.size(); }
|
UInt64 getSize() const override { return data.size(); }
|
||||||
std::optional<UInt128> getChecksum() const override { return checksum; }
|
std::optional<UInt128> getChecksum() const override { return checksum; }
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const String data;
|
const String data;
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <Common/StringUtils/StringUtils.h>
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
#include <Common/hex.h>
|
#include <Common/hex.h>
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
#include <IO/ConcatReadBuffer.h>
|
#include <IO/ConcatSeekableReadBuffer.h>
|
||||||
#include <IO/HashingReadBuffer.h>
|
#include <IO/HashingReadBuffer.h>
|
||||||
#include <IO/ReadBufferFromFileBase.h>
|
#include <IO/ReadBufferFromFileBase.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
@ -63,13 +63,14 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer() const override
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override
|
||||||
{
|
{
|
||||||
auto read_buffer = backup->readFileImpl(data_file_name);
|
auto read_buffer = backup->readFileImpl(data_file_name);
|
||||||
if (base_backup_entry)
|
if (base_backup_entry)
|
||||||
{
|
{
|
||||||
auto base_backup_read_buffer = base_backup_entry->getReadBuffer();
|
size_t base_size = base_backup_entry->getSize();
|
||||||
read_buffer = std::make_unique<ConcatReadBuffer>(std::move(base_backup_read_buffer), std::move(read_buffer));
|
read_buffer = std::make_unique<ConcatSeekableReadBuffer>(
|
||||||
|
base_backup_entry->getReadBuffer(), base_size, std::move(read_buffer), size - base_size);
|
||||||
}
|
}
|
||||||
return read_buffer;
|
return read_buffer;
|
||||||
}
|
}
|
||||||
@ -522,7 +523,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
|
|||||||
base_checksum = base_backup->getFileChecksum(file_name);
|
base_checksum = base_backup->getFileChecksum(file_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> read_buffer; /// We'll set that later.
|
std::unique_ptr<SeekableReadBuffer> read_buffer; /// We'll set that later.
|
||||||
std::optional<HashingReadBuffer> hashing_read_buffer;
|
std::optional<HashingReadBuffer> hashing_read_buffer;
|
||||||
UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`.
|
UInt64 hashing_pos = 0; /// Current position in `hashing_read_buffer`.
|
||||||
|
|
||||||
@ -608,16 +609,9 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
|
|||||||
auto copy_pos = use_base ? base_size : 0;
|
auto copy_pos = use_base ? base_size : 0;
|
||||||
|
|
||||||
/// Move the current read position to the start position to copy data.
|
/// Move the current read position to the start position to copy data.
|
||||||
/// If `read_buffer` is seekable it's easier, otherwise we can use ignore().
|
if (!read_buffer)
|
||||||
if (auto * seekable_buffer = dynamic_cast<SeekableReadBuffer *>(read_buffer.get()))
|
|
||||||
{
|
|
||||||
seekable_buffer->seek(copy_pos, SEEK_SET);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
read_buffer = entry->getReadBuffer();
|
read_buffer = entry->getReadBuffer();
|
||||||
read_buffer->ignore(copy_pos);
|
read_buffer->seek(copy_pos, SEEK_SET);
|
||||||
}
|
|
||||||
|
|
||||||
/// Copy the entry's data after `copy_pos`.
|
/// Copy the entry's data after `copy_pos`.
|
||||||
auto out = writeFileImpl(getHexUIntLowercase(*checksum));
|
auto out = writeFileImpl(getHexUIntLowercase(*checksum));
|
||||||
|
@ -11,6 +11,7 @@ namespace DB
|
|||||||
{
|
{
|
||||||
class Context;
|
class Context;
|
||||||
using ContextPtr = std::shared_ptr<const Context>;
|
using ContextPtr = std::shared_ptr<const Context>;
|
||||||
|
class SeekableReadBuffer;
|
||||||
|
|
||||||
/// Base implementation of IBackup.
|
/// Base implementation of IBackup.
|
||||||
/// Along with passed files it also stores backup metadata - a single file named ".backup" in XML format
|
/// Along with passed files it also stores backup metadata - a single file named ".backup" in XML format
|
||||||
@ -53,7 +54,7 @@ protected:
|
|||||||
|
|
||||||
/// Read a file from the backup.
|
/// Read a file from the backup.
|
||||||
/// Low level: the function doesn't check base backup or checksums.
|
/// Low level: the function doesn't check base backup or checksums.
|
||||||
virtual std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const = 0;
|
virtual std::unique_ptr<SeekableReadBuffer> readFileImpl(const String & file_name) const = 0;
|
||||||
|
|
||||||
/// Add a file to the backup.
|
/// Add a file to the backup.
|
||||||
/// Low level: the function doesn't check base backup or checksums.
|
/// Low level: the function doesn't check base backup or checksums.
|
||||||
|
@ -56,7 +56,7 @@ void DirectoryBackup::closeImpl(const Strings & written_files_, bool writing_fin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> DirectoryBackup::readFileImpl(const String & file_name) const
|
std::unique_ptr<SeekableReadBuffer> DirectoryBackup::readFileImpl(const String & file_name) const
|
||||||
{
|
{
|
||||||
auto file_path = path / file_name;
|
auto file_path = path / file_name;
|
||||||
return disk->readFile(file_path);
|
return disk->readFile(file_path);
|
||||||
|
@ -27,7 +27,7 @@ private:
|
|||||||
bool backupExists() const override;
|
bool backupExists() const override;
|
||||||
void openImpl(OpenMode open_mode_) override;
|
void openImpl(OpenMode open_mode_) override;
|
||||||
void closeImpl(const Strings & written_files_, bool writing_finalized_) override;
|
void closeImpl(const Strings & written_files_, bool writing_finalized_) override;
|
||||||
std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const override;
|
std::unique_ptr<SeekableReadBuffer> readFileImpl(const String & file_name) const override;
|
||||||
std::unique_ptr<WriteBuffer> writeFileImpl(const String & file_name) override;
|
std::unique_ptr<WriteBuffer> writeFileImpl(const String & file_name) override;
|
||||||
|
|
||||||
DiskPtr disk;
|
DiskPtr disk;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#include <Backups/IBackupEntriesBatch.h>
|
#include <Backups/IBackupEntriesBatch.h>
|
||||||
#include <IO/ReadBuffer.h>
|
#include <IO/SeekableReadBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -15,7 +15,7 @@ public:
|
|||||||
|
|
||||||
UInt64 getSize() const override { return batch->getSize(index); }
|
UInt64 getSize() const override { return batch->getSize(index); }
|
||||||
std::optional<UInt128> getChecksum() const override { return batch->getChecksum(index); }
|
std::optional<UInt128> getChecksum() const override { return batch->getChecksum(index); }
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer() const override { return batch->getReadBuffer(index); }
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return batch->getReadBuffer(index); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::shared_ptr<IBackupEntriesBatch> batch;
|
const std::shared_ptr<IBackupEntriesBatch> batch;
|
||||||
|
@ -17,7 +17,7 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
IBackupEntriesBatch(const Strings & entry_names_) : entry_names(entry_names_) {}
|
IBackupEntriesBatch(const Strings & entry_names_) : entry_names(entry_names_) {}
|
||||||
|
|
||||||
virtual std::unique_ptr<ReadBuffer> getReadBuffer(size_t index) = 0;
|
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer(size_t index) = 0;
|
||||||
virtual UInt64 getSize(size_t index) = 0;
|
virtual UInt64 getSize(size_t index) = 0;
|
||||||
virtual std::optional<UInt128> getChecksum(size_t) { return {}; }
|
virtual std::optional<UInt128> getChecksum(size_t) { return {}; }
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
class ReadBuffer;
|
class SeekableReadBuffer;
|
||||||
|
|
||||||
/// A backup entry represents some data which should be written to the backup or has been read from the backup.
|
/// A backup entry represents some data which should be written to the backup or has been read from the backup.
|
||||||
class IBackupEntry
|
class IBackupEntry
|
||||||
@ -23,7 +23,7 @@ public:
|
|||||||
virtual std::optional<UInt128> getChecksum() const { return {}; }
|
virtual std::optional<UInt128> getChecksum() const { return {}; }
|
||||||
|
|
||||||
/// Returns a read buffer for reading the data.
|
/// Returns a read buffer for reading the data.
|
||||||
virtual std::unique_ptr<ReadBuffer> getReadBuffer() const = 0;
|
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer() const = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
|
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
|
||||||
|
139
src/IO/ConcatSeekableReadBuffer.cpp
Normal file
139
src/IO/ConcatSeekableReadBuffer.cpp
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
#include <IO/ConcatSeekableReadBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcatSeekableReadBuffer::BufferInfo::~BufferInfo()
|
||||||
|
{
|
||||||
|
if (own_in)
|
||||||
|
delete in;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> buf1, size_t size1, std::unique_ptr<SeekableReadBuffer> buf2, size_t size2) : ConcatSeekableReadBuffer()
|
||||||
|
{
|
||||||
|
appendBuffer(std::move(buf1), size1);
|
||||||
|
appendBuffer(std::move(buf2), size2);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(SeekableReadBuffer & buf1, size_t size1, SeekableReadBuffer & buf2, size_t size2) : ConcatSeekableReadBuffer()
|
||||||
|
{
|
||||||
|
appendBuffer(buf1, size1);
|
||||||
|
appendBuffer(buf2, size2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcatSeekableReadBuffer::appendBuffer(std::unique_ptr<SeekableReadBuffer> buffer, size_t size)
|
||||||
|
{
|
||||||
|
appendBuffer(buffer.release(), true, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer & buffer, size_t size)
|
||||||
|
{
|
||||||
|
appendBuffer(&buffer, false, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer * buffer, bool own, size_t size)
|
||||||
|
{
|
||||||
|
BufferInfo info;
|
||||||
|
info.in = buffer;
|
||||||
|
info.own_in = own;
|
||||||
|
info.size = size;
|
||||||
|
|
||||||
|
if (!size)
|
||||||
|
return;
|
||||||
|
|
||||||
|
buffers.emplace_back(std::move(info));
|
||||||
|
total_size += size;
|
||||||
|
|
||||||
|
if (current == buffers.size() - 1)
|
||||||
|
{
|
||||||
|
working_buffer = buffers[current].in->buffer();
|
||||||
|
pos = buffers[current].in->position();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConcatSeekableReadBuffer::nextImpl()
|
||||||
|
{
|
||||||
|
if (current < buffers.size())
|
||||||
|
{
|
||||||
|
buffers[current].in->position() = pos;
|
||||||
|
while ((current < buffers.size()) && buffers[current].in->eof())
|
||||||
|
{
|
||||||
|
current_start_pos += buffers[current++].size;
|
||||||
|
if (current < buffers.size())
|
||||||
|
buffers[current].in->seek(0, SEEK_SET);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (current >= buffers.size())
|
||||||
|
{
|
||||||
|
current_start_pos = total_size;
|
||||||
|
set(nullptr, 0);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
working_buffer = buffers[current].in->buffer();
|
||||||
|
pos = buffers[current].in->position();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
off_t ConcatSeekableReadBuffer::getPosition()
|
||||||
|
{
|
||||||
|
size_t current_pos = current_start_pos;
|
||||||
|
if (current < buffers.size())
|
||||||
|
current_pos += buffers[current].in->getPosition() + offset();
|
||||||
|
return current_pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
off_t ConcatSeekableReadBuffer::seek(off_t off, int whence)
|
||||||
|
{
|
||||||
|
off_t new_position;
|
||||||
|
off_t current_position = getPosition();
|
||||||
|
if (whence == SEEK_SET)
|
||||||
|
new_position = off;
|
||||||
|
else if (whence == SEEK_CUR)
|
||||||
|
new_position = current_position + off;
|
||||||
|
else
|
||||||
|
throw Exception("ConcatSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
|
||||||
|
if (new_position < 0)
|
||||||
|
throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
if (static_cast<UInt64>(new_position) > total_size)
|
||||||
|
throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
|
||||||
|
if (static_cast<UInt64>(new_position) == total_size)
|
||||||
|
{
|
||||||
|
current = buffers.size();
|
||||||
|
current_start_pos = total_size;
|
||||||
|
set(nullptr, 0);
|
||||||
|
return new_position;
|
||||||
|
}
|
||||||
|
|
||||||
|
off_t change_position = new_position - current_position;
|
||||||
|
if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end()))
|
||||||
|
{
|
||||||
|
/// Position is still inside the same working buffer.
|
||||||
|
pos += change_position;
|
||||||
|
assert(pos >= working_buffer.begin());
|
||||||
|
assert(pos <= working_buffer.end());
|
||||||
|
return new_position;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (new_position < static_cast<off_t>(current_start_pos))
|
||||||
|
current_start_pos -= buffers[--current].size;
|
||||||
|
|
||||||
|
while (new_position >= static_cast<off_t>(current_start_pos + buffers[current].size))
|
||||||
|
current_start_pos += buffers[current++].size;
|
||||||
|
|
||||||
|
buffers[current].in->seek(new_position - current_start_pos, SEEK_SET);
|
||||||
|
working_buffer = buffers[current].in->buffer();
|
||||||
|
pos = buffers[current].in->position();
|
||||||
|
return new_position;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
46
src/IO/ConcatSeekableReadBuffer.h
Normal file
46
src/IO/ConcatSeekableReadBuffer.h
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <IO/SeekableReadBuffer.h>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Reads from the concatenation of multiple SeekableReadBuffer's
|
||||||
|
class ConcatSeekableReadBuffer : public SeekableReadBufferWithSize
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ConcatSeekableReadBuffer() : SeekableReadBufferWithSize(nullptr, 0) { }
|
||||||
|
ConcatSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> buf1, size_t size1, std::unique_ptr<SeekableReadBuffer> buf2, size_t size2);
|
||||||
|
ConcatSeekableReadBuffer(SeekableReadBuffer & buf1, size_t size1, SeekableReadBuffer & buf2, size_t size2);
|
||||||
|
|
||||||
|
void appendBuffer(std::unique_ptr<SeekableReadBuffer> buffer, size_t size);
|
||||||
|
void appendBuffer(SeekableReadBuffer & buffer, size_t size);
|
||||||
|
|
||||||
|
off_t seek(off_t off, int whence) override;
|
||||||
|
off_t getPosition() override;
|
||||||
|
|
||||||
|
std::optional<size_t> getTotalSize() override { return total_size; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool nextImpl() override;
|
||||||
|
void appendBuffer(SeekableReadBuffer * buffer, bool own, size_t size);
|
||||||
|
|
||||||
|
struct BufferInfo
|
||||||
|
{
|
||||||
|
BufferInfo() = default;
|
||||||
|
BufferInfo(BufferInfo &&) = default;
|
||||||
|
~BufferInfo();
|
||||||
|
SeekableReadBuffer * in = nullptr;
|
||||||
|
bool own_in = false;
|
||||||
|
size_t size = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<BufferInfo> buffers;
|
||||||
|
size_t total_size = 0;
|
||||||
|
size_t current = 0;
|
||||||
|
size_t current_start_pos = 0; /// Position of the current buffer's begin.
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
131
src/IO/LimitSeekableReadBuffer.cpp
Normal file
131
src/IO/LimitSeekableReadBuffer.cpp
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
#include <IO/LimitSeekableReadBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
|
extern const int LIMIT_EXCEEDED;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LimitSeekableReadBuffer::nextImpl()
|
||||||
|
{
|
||||||
|
if (end_position >= static_cast<off_t>(limit))
|
||||||
|
{
|
||||||
|
/// Limit reached.
|
||||||
|
set(in->position(), 0);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(position() >= in->position());
|
||||||
|
in->position() = position();
|
||||||
|
|
||||||
|
if (!in->next())
|
||||||
|
{
|
||||||
|
/// EOF reached.
|
||||||
|
set(in->position(), 0);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
working_buffer = in->buffer();
|
||||||
|
pos = in->position();
|
||||||
|
end_position = in->getPosition() + in->available();
|
||||||
|
|
||||||
|
if (end_position > static_cast<off_t>(limit))
|
||||||
|
{
|
||||||
|
working_buffer.resize(working_buffer.size() - end_position + limit);
|
||||||
|
end_position = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
off_t LimitSeekableReadBuffer::seek(off_t off, int whence)
|
||||||
|
{
|
||||||
|
off_t new_position;
|
||||||
|
off_t current_position = getPosition();
|
||||||
|
if (whence == SEEK_SET)
|
||||||
|
new_position = off;
|
||||||
|
else if (whence == SEEK_CUR)
|
||||||
|
new_position = current_position + off;
|
||||||
|
else
|
||||||
|
throw Exception("LimitSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
|
||||||
|
if (new_position < 0)
|
||||||
|
throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
if (static_cast<UInt64>(new_position) > limit)
|
||||||
|
throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
|
||||||
|
off_t change_position = new_position - current_position;
|
||||||
|
if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end()))
|
||||||
|
{
|
||||||
|
/// Position is still inside buffer.
|
||||||
|
pos += change_position;
|
||||||
|
assert(pos >= working_buffer.begin());
|
||||||
|
assert(pos <= working_buffer.end());
|
||||||
|
return new_position;
|
||||||
|
}
|
||||||
|
|
||||||
|
in->seek(new_position, SEEK_SET);
|
||||||
|
working_buffer = in->buffer();
|
||||||
|
pos = in->position();
|
||||||
|
end_position = in->getPosition() + in->available();
|
||||||
|
|
||||||
|
if (end_position > static_cast<off_t>(limit))
|
||||||
|
{
|
||||||
|
working_buffer.resize(working_buffer.size() - end_position + limit);
|
||||||
|
end_position = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new_position;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_)
|
||||||
|
: SeekableReadBuffer(in_ ? in_->position() : nullptr, 0)
|
||||||
|
, in(in_)
|
||||||
|
, owns_in(owns)
|
||||||
|
, limit(limit_)
|
||||||
|
{
|
||||||
|
assert(in);
|
||||||
|
|
||||||
|
off_t current_position = in->getPosition();
|
||||||
|
if (current_position > static_cast<off_t>(limit))
|
||||||
|
throw Exception("Limit for LimitSeekableReadBuffer exceeded", ErrorCodes::LIMIT_EXCEEDED);
|
||||||
|
|
||||||
|
working_buffer = in->buffer();
|
||||||
|
pos = in->position();
|
||||||
|
end_position = current_position + in->available();
|
||||||
|
|
||||||
|
if (end_position > static_cast<off_t>(limit))
|
||||||
|
{
|
||||||
|
working_buffer.resize(working_buffer.size() - end_position + limit);
|
||||||
|
end_position = limit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_)
|
||||||
|
: LimitSeekableReadBuffer(&in_, false, limit_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 limit_)
|
||||||
|
: LimitSeekableReadBuffer(in_.release(), true, limit_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
LimitSeekableReadBuffer::~LimitSeekableReadBuffer()
|
||||||
|
{
|
||||||
|
/// Update underlying buffer's position in case when limit wasn't reached.
|
||||||
|
in->position() = position();
|
||||||
|
if (owns_in)
|
||||||
|
delete in;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
33
src/IO/LimitSeekableReadBuffer.h
Normal file
33
src/IO/LimitSeekableReadBuffer.h
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <base/types.h>
|
||||||
|
#include <IO/SeekableReadBuffer.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/** Allows to read from another SeekableReadBuffer no far than the specified offset.
|
||||||
|
* Note that the nested SeekableReadBuffer may read slightly more data internally to fill its buffer.
|
||||||
|
*/
|
||||||
|
class LimitSeekableReadBuffer : public SeekableReadBuffer
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_);
|
||||||
|
LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 limit_);
|
||||||
|
~LimitSeekableReadBuffer() override;
|
||||||
|
|
||||||
|
off_t seek(off_t off, int whence) override;
|
||||||
|
off_t getPosition() override { return end_position - available(); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
SeekableReadBuffer * in;
|
||||||
|
bool owns_in;
|
||||||
|
UInt64 limit;
|
||||||
|
off_t end_position; /// Offset of the end of working_buffer.
|
||||||
|
|
||||||
|
LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_);
|
||||||
|
bool nextImpl() override;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -448,7 +448,7 @@ private:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> getReadBuffer(size_t index) override
|
std::unique_ptr<SeekableReadBuffer> getReadBuffer(size_t index) override
|
||||||
{
|
{
|
||||||
initialize();
|
initialize();
|
||||||
return createReadBufferFromFileBase(file_paths[index], {});
|
return createReadBufferFromFileBase(file_paths[index], {});
|
||||||
|
Loading…
Reference in New Issue
Block a user