From 4d0a6bad5e7ca3d0c3f617fac9a7d240ff22bdc2 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Sun, 12 May 2019 06:15:08 +0300 Subject: [PATCH] PeekableReadBuffer --- dbms/src/IO/PeekableReadBuffer.cpp | 271 +++++++++++++++++++++++++++++ dbms/src/IO/PeekableReadBuffer.h | 129 +++++--------- 2 files changed, 316 insertions(+), 84 deletions(-) create mode 100644 dbms/src/IO/PeekableReadBuffer.cpp diff --git a/dbms/src/IO/PeekableReadBuffer.cpp b/dbms/src/IO/PeekableReadBuffer.cpp new file mode 100644 index 00000000000..03fc8462ee5 --- /dev/null +++ b/dbms/src/IO/PeekableReadBuffer.cpp @@ -0,0 +1,271 @@ +#include + +namespace DB +{ + +PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t unread_limit_ /* = default_limit*/) + : sub_buf(sub_buf_), unread_limit(unread_limit_) +{ + /// Read from sub-buffer + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + + checkStateCorrect(); +} + +/// Saves unread data to own memory, so it will be possible to read it later. Loads next data to sub-buffer +bool PeekableReadBuffer::peekNext() +{ + checkStateCorrect(); + + size_t bytes_to_copy = sub_buf.available(); + if (useSubbufferOnly()) + { + /// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer) + Position copy_from = pos; + if (checkpoint) + copy_from = checkpoint; + bytes += copy_from - sub_buf.buffer().begin(); + sub_buf.position() = copy_from; + bytes_to_copy = sub_buf.available(); + if (!bytes_to_copy) + { + /// Both checkpoint and pos are at the end of sub-buffer. Just load next part of data. + bool res = sub_buf.next(); + BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); + if (checkpoint) + checkpoint = pos; + + checkStateCorrect(); + return res; + } + } + + resizeOwnMemoryIfNecessary(bytes_to_copy); + + /// Save unread data from sub-buffer to own memory + memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_copy); + + /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary + /// Otherwise, checkpoint now at the beginning of own memory + if (checkpoint && useSubbufferOnly()) + { + checkpoint = memory.data(); + checkpoint_in_own_memory = true; + } + if (currentlyReadFromOwnMemory()) + { + /// Update buffer size + BufferBase::set(memory.data(), peeked_size + bytes_to_copy, offset()); + } + else + { + /// Switch to reading from own memory + size_t pos_offset = peeked_size + this->offset(); + if (useSubbufferOnly()) pos_offset = bytes_to_copy; + BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset); + + } + + peeked_size += bytes_to_copy; + sub_buf.position() += bytes_to_copy; + + checkStateCorrect(); + return sub_buf.next(); +} + +void PeekableReadBuffer::setCheckpoint() +{ + checkpoint_in_own_memory = currentlyReadFromOwnMemory(); + if (!checkpoint_in_own_memory) + { + /// Don't need to store unread data anymore + peeked_size = 0; + } + checkpoint = pos; +} + +void PeekableReadBuffer::dropCheckpoint() +{ + if (!currentlyReadFromOwnMemory()) + { + /// Don't need to store unread data anymore + peeked_size = 0; + } + checkpoint = nullptr; + checkpoint_in_own_memory = false; +} + +void PeekableReadBuffer::rollbackToCheckpoint() +{ + if (!checkpoint) + throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR); + else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory()) + pos = checkpoint; + else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory + BufferBase::set(memory.data(), peeked_size, checkpoint - memory.data()); +} + +bool PeekableReadBuffer::nextImpl() +{ + /// FIXME wrong bytes count because it can read the same data again after rollbackToCheckpoint() + /// However, changing bytes count on every call of next() (even after rollback) allows to determine if some pointers were invalidated. + checkStateCorrect(); + bool res = true; + + if (!checkpoint) + { + if (!useSubbufferOnly()) + { + /// All copied data have been read from own memory, continue reading from sub_buf + peeked_size = 0; + } + else + { + /// Load next data to sub_buf + sub_buf.position() = pos; + res = sub_buf.next(); + } + + Buffer & sub_working = sub_buf.buffer(); + /// Switch to reading from sub_buf (or just update it if already switched) + BufferBase::set(sub_working.begin(), sub_working.size(), 0); + } + else + { + if (!currentlyReadFromOwnMemory()) + res = peekNext(); + Buffer & sub_working = sub_buf.buffer(); + BufferBase::set(sub_working.begin(), sub_working.size(), 0); + } + + checkStateCorrect(); + return res; +} + +bool PeekableReadBuffer::useSubbufferOnly() const +{ + return !peeked_size; +} + +void PeekableReadBuffer::checkStateCorrect() const +{ + if (checkpoint) + { + if (checkpointInOwnMemory()) + { + if (!peeked_size) + throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR); + if (currentlyReadFromOwnMemory() && pos < checkpoint) + throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR); + } + else + { + if (peeked_size) + throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR); + if (currentlyReadFromOwnMemory()) + throw DB::Exception("Current position in own buffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR); + if (pos < checkpoint) + throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR); + } + } + else + { + + if (!currentlyReadFromOwnMemory() && peeked_size) + throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR); + } + if (currentlyReadFromOwnMemory() && !peeked_size) + throw DB::Exception("Pos in empty own buffer", ErrorCodes::LOGICAL_ERROR); +} + +size_t PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append) +{ + checkStateCorrect(); + bool needUpdateCheckpoint = checkpointInOwnMemory(); + bool needUpdatePos = currentlyReadFromOwnMemory(); + size_t offset = 0; + if (needUpdateCheckpoint) + offset = checkpoint - memory.data(); + else if (needUpdatePos) + offset = this->offset(); + + size_t new_size = peeked_size + bytes_to_append; + if (memory.size() < new_size) + { + if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size()) + { + /// Move unread data to the beginning of own memory instead of resize own memory + peeked_size -= offset; + memmove(memory.data(), memory.data() + offset, peeked_size); + bytes += offset; + + if (needUpdateCheckpoint) + checkpoint -= offset; + if (needUpdatePos) + pos -= offset; + + checkStateCorrect(); + return 0; + } + else + { + if (unread_limit < new_size) + throw DB::Exception("trying to peek too much data", ErrorCodes::MEMORY_LIMIT_EXCEEDED); + + size_t pos_offset = pos - memory.data(); + + // TODO amortization + memory.resize(new_size); + + if (needUpdateCheckpoint) + checkpoint = memory.data() + offset; + if (needUpdatePos) + { + BufferBase::set(memory.data(), peeked_size, pos_offset); + } + } + } + + checkStateCorrect(); + return offset; +} + +PeekableReadBuffer::~PeekableReadBuffer() +{ + if (!currentlyReadFromOwnMemory()) + sub_buf.position() = pos; +} + +BufferWithOwnMemory PeekableReadBuffer::takeUnreadData() +{ + if (!currentlyReadFromOwnMemory()) + return BufferWithOwnMemory(0); + size_t unread_size = memory.data() + peeked_size - pos; + BufferWithOwnMemory unread(unread_size); + memcpy(unread.buffer().begin(), pos, unread_size); + peeked_size = 0; + checkpoint = nullptr; + checkpoint_in_own_memory = false; + BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); + return unread; +} + +bool PeekableReadBuffer::currentlyReadFromOwnMemory() const +{ + return working_buffer.begin() != sub_buf.buffer().begin(); +} + +bool PeekableReadBuffer::checkpointInOwnMemory() const +{ + return checkpoint_in_own_memory; +} + +void PeekableReadBuffer::assertCanBeDistructed() const +{ + if (peeked_size && pos != memory.data() + peeked_size) + throw DB::Exception("There are data, which were extracted from sub-buffer, but not from peekable buffer: " + "Cannot destruct peekable buffer correctly because tha data will be lost", ErrorCodes::LOGICAL_ERROR); +} + +} diff --git a/dbms/src/IO/PeekableReadBuffer.h b/dbms/src/IO/PeekableReadBuffer.h index c1c923db5fc..a7efc3134bb 100644 --- a/dbms/src/IO/PeekableReadBuffer.h +++ b/dbms/src/IO/PeekableReadBuffer.h @@ -7,110 +7,71 @@ namespace DB namespace ErrorCodes { -extern const int MEMORY_LIMIT_EXCEEDED; + extern const int MEMORY_LIMIT_EXCEEDED; + extern const int LOGICAL_ERROR; } -/// Allows to peek next part of data from sub-buffer without extracting it +/// Allows to peek next part of data from sub-buffer without extracting it. +/// Also allows to set checkpoint at some position in stream and come back to this position later, +/// even if next() was called. +/// Sub-buffer should not be accessed directly during the lifelime of peekable buffer. +/// If position() of peekable buffer is explicitly set to some position before checkpoint +/// (e.g. by istr.position() = prev_pos), behavior is undefined. class PeekableReadBuffer : public BufferWithOwnMemory { public: constexpr static size_t default_limit = 32 * DBMS_DEFAULT_BUFFER_SIZE; - explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t unread_limit_ = default_limit) - : sub_buf(sub_buf_), unread_limit(unread_limit_), peeked_size(0) - { - /// Read from sub-buffer - Buffer & sub_working = sub_buf.buffer(); - BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); - } + explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t unread_limit_ = default_limit); - bool peekNext() - { - if (!readFromOwnMemory()) - { - bytes += pos - sub_buf.buffer().begin(); - sub_buf.position() = pos; - } - size_t available = sub_buf.available(); - if (!available) - { - bool res = sub_buf.next(); - if (!readFromOwnMemory()) - BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); - return res; - } + /// Use takeUnreadData() to extract unread data before destruct object + ~PeekableReadBuffer() override; - size_t offset = resizeOwnMemoryIfNecessary(available); + /// Saves unread data to own memory, so it will be possible to read it later. Loads next data to sub-buffer. + /// Doesn't change checkpoint and position in stream, + /// but all pointers (such as this->buffer().end() and this->position()) may be invalidated + /// @returns false in case of EOF in sub-buffer, otherwise returns true + bool peekNext(); - /// Save unread data from sub-buffer to own memory - memcpy(memory.data() + peeked_size, sub_buf.position(), available); - peeked_size += available; - /// Switch to reading from own memory (or just update size if already switched) - BufferBase::set(memory.data(), peeked_size, offset); + Buffer & lastPeeked() { return sub_buf.buffer(); } - sub_buf.position() += available; - return sub_buf.next(); - } + /// Sets checkpoint at current position + void setCheckpoint(); - Buffer & lastPeeked() - { - return sub_buf.buffer(); - } + /// Forget checkpoint and all data between checkpoint and position + void dropCheckpoint(); + + /// Sets position at checkpoint. + /// All pointers (such as this->buffer().end()) may be invalidated + void rollbackToCheckpoint(); + + /// If position is in own memory, returns buffer with data, which were extracted from sub-buffer, + /// but not from this buffer, so the data will not be lost after destruction of this buffer. + /// If position is in sub-buffer, returns empty buffer. + BufferWithOwnMemory takeUnreadData(); + void assertCanBeDistructed() const; private: - bool nextImpl() override - { - bool res = true; - if (readFromOwnMemory()) - { - /// All copied data have been read from own memory, continue reading from sub_buf - peeked_size = 0; - } - else - { - /// Load next data to sub_buf - sub_buf.position() = pos; - res = sub_buf.next(); - } - Buffer & sub_working = sub_buf.buffer(); - /// Switch to reading from sub_buf (or just update it if already switched) - BufferBase::set(sub_working.begin(), sub_working.size(), 0); - return res; - } + bool nextImpl() override; - inline bool readFromOwnMemory() const - { - return peeked_size; - } + inline bool useSubbufferOnly() const; + inline bool currentlyReadFromOwnMemory() const; + inline bool checkpointInOwnMemory() const; + + void checkStateCorrect() const; + + /// Makes possible to append `bytes_to_append` bytes to data in own memory. + /// Updates all invalidated pointers and sizes. + /// @returns new offset of unread data in own memory + size_t resizeOwnMemoryIfNecessary(size_t bytes_to_append); - size_t resizeOwnMemoryIfNecessary(size_t bytes_to_append) - { - size_t offset = readFromOwnMemory() ? this->offset() : 0; - size_t new_size = peeked_size + bytes_to_append; - if (memory.size() < new_size) - { - if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size()) - { - /// Move unread data to the beginning of own memory instead of resize own memory - peeked_size -= offset; - memmove(memory.data(), memory.data() + offset, peeked_size); - bytes += offset; - return 0; - } - else - { - if (unread_limit < new_size) - throw DB::Exception("trying to peek too much data", ErrorCodes::MEMORY_LIMIT_EXCEEDED); - memory.resize(new_size); - } - } - return offset; - } ReadBuffer & sub_buf; const size_t unread_limit; - size_t peeked_size; + size_t peeked_size = 0; + Position checkpoint = nullptr; + bool checkpoint_in_own_memory = false; }; }