2019-04-12 00:45:18 +00:00
|
|
|
#pragma once
|
|
|
|
#include <IO/ReadBuffer.h>
|
|
|
|
#include <IO/BufferWithOwnMemory.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-10-01 10:44:28 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2019-09-30 18:21:58 +00:00
|
|
|
/// Also allows to set checkpoint at some position in stream and come back to this position later.
|
|
|
|
/// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer
|
2020-06-11 00:51:27 +00:00
|
|
|
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
|
|
|
|
/// you reset() the state of peekable buffer after each change of underlying buffer)
|
2019-05-12 03:15:08 +00:00
|
|
|
/// If position() of peekable buffer is explicitly set to some position before checkpoint
|
|
|
|
/// (e.g. by istr.position() = prev_pos), behavior is undefined.
|
2019-04-12 00:45:18 +00:00
|
|
|
class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
|
|
|
|
{
|
2019-08-29 13:30:43 +00:00
|
|
|
friend class PeekableReadBufferCheckpoint;
|
2019-04-12 00:45:18 +00:00
|
|
|
public:
|
2021-01-12 18:55:12 +00:00
|
|
|
explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
2019-05-12 03:15:08 +00:00
|
|
|
|
|
|
|
~PeekableReadBuffer() override;
|
|
|
|
|
|
|
|
/// Sets checkpoint at current position
|
2019-09-26 10:49:22 +00:00
|
|
|
ALWAYS_INLINE inline void setCheckpoint()
|
2019-09-25 16:08:58 +00:00
|
|
|
{
|
|
|
|
#ifndef NDEBUG
|
|
|
|
if (checkpoint)
|
|
|
|
throw DB::Exception("Does not support recursive checkpoints.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
#endif
|
|
|
|
checkpoint_in_own_memory = currentlyReadFromOwnMemory();
|
|
|
|
if (!checkpoint_in_own_memory)
|
|
|
|
{
|
|
|
|
/// Don't need to store unread data anymore
|
|
|
|
peeked_size = 0;
|
|
|
|
}
|
2021-01-14 14:31:35 +00:00
|
|
|
checkpoint.emplace(pos);
|
2019-09-25 16:08:58 +00:00
|
|
|
}
|
2019-05-12 03:15:08 +00:00
|
|
|
|
|
|
|
/// Forget checkpoint and all data between checkpoint and position
|
2019-09-26 10:49:22 +00:00
|
|
|
ALWAYS_INLINE inline void dropCheckpoint()
|
2019-09-25 16:08:58 +00:00
|
|
|
{
|
|
|
|
#ifndef NDEBUG
|
|
|
|
if (!checkpoint)
|
|
|
|
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
#endif
|
|
|
|
if (!currentlyReadFromOwnMemory())
|
|
|
|
{
|
|
|
|
/// Don't need to store unread data anymore
|
|
|
|
peeked_size = 0;
|
|
|
|
}
|
2021-01-14 14:31:35 +00:00
|
|
|
checkpoint = std::nullopt;
|
2019-09-25 16:08:58 +00:00
|
|
|
checkpoint_in_own_memory = false;
|
|
|
|
}
|
2019-05-12 03:15:08 +00:00
|
|
|
|
|
|
|
/// Sets position at checkpoint.
|
|
|
|
/// All pointers (such as this->buffer().end()) may be invalidated
|
2021-02-19 12:51:26 +00:00
|
|
|
void rollbackToCheckpoint(bool drop = false);
|
2019-05-12 03:15:08 +00:00
|
|
|
|
2019-09-30 18:21:58 +00:00
|
|
|
/// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory,
|
|
|
|
/// so data between checkpoint and position will be in continuous memory.
|
2019-09-09 17:06:22 +00:00
|
|
|
void makeContinuousMemoryFromCheckpointToPos();
|
|
|
|
|
2019-09-30 18:21:58 +00:00
|
|
|
/// Returns true if there unread data extracted from sub-buffer in own memory.
|
|
|
|
/// This data will be lost after destruction of peekable buffer.
|
|
|
|
bool hasUnreadData() const;
|
2019-04-12 00:45:18 +00:00
|
|
|
|
2020-06-11 00:51:27 +00:00
|
|
|
// for streaming reading (like in Kafka) we need to restore initial state of the buffer
|
|
|
|
// w/o recreating the buffer.
|
|
|
|
void reset();
|
|
|
|
|
2019-04-12 00:45:18 +00:00
|
|
|
private:
|
2019-05-12 03:15:08 +00:00
|
|
|
bool nextImpl() override;
|
|
|
|
|
2019-09-30 18:21:58 +00:00
|
|
|
bool peekNext();
|
|
|
|
|
2019-09-25 16:08:58 +00:00
|
|
|
inline bool useSubbufferOnly() const { return !peeked_size; }
|
|
|
|
inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf.buffer().begin(); }
|
|
|
|
inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }
|
2019-05-12 03:15:08 +00:00
|
|
|
|
|
|
|
void checkStateCorrect() const;
|
|
|
|
|
|
|
|
/// Makes possible to append `bytes_to_append` bytes to data in own memory.
|
|
|
|
/// Updates all invalidated pointers and sizes.
|
2019-09-30 18:21:58 +00:00
|
|
|
void resizeOwnMemoryIfNecessary(size_t bytes_to_append);
|
2019-05-12 03:15:08 +00:00
|
|
|
|
2019-04-21 15:37:04 +00:00
|
|
|
|
2019-04-12 00:45:18 +00:00
|
|
|
ReadBuffer & sub_buf;
|
2019-05-12 03:15:08 +00:00
|
|
|
size_t peeked_size = 0;
|
2021-01-14 14:31:35 +00:00
|
|
|
std::optional<Position> checkpoint = std::nullopt;
|
2019-05-12 03:15:08 +00:00
|
|
|
bool checkpoint_in_own_memory = false;
|
2019-04-12 00:45:18 +00:00
|
|
|
};
|
|
|
|
|
2019-05-17 01:12:32 +00:00
|
|
|
|
|
|
|
class PeekableReadBufferCheckpoint : boost::noncopyable
|
|
|
|
{
|
|
|
|
PeekableReadBuffer & buf;
|
2019-08-29 13:30:43 +00:00
|
|
|
bool auto_rollback;
|
2019-05-17 01:12:32 +00:00
|
|
|
public:
|
2019-08-29 13:30:43 +00:00
|
|
|
explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false)
|
|
|
|
: buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); }
|
|
|
|
~PeekableReadBufferCheckpoint()
|
|
|
|
{
|
|
|
|
if (!buf.checkpoint)
|
|
|
|
return;
|
|
|
|
if (auto_rollback)
|
|
|
|
buf.rollbackToCheckpoint();
|
|
|
|
buf.dropCheckpoint();
|
|
|
|
}
|
2019-05-17 01:12:32 +00:00
|
|
|
|
|
|
|
};
|
|
|
|
|
2019-04-12 00:45:18 +00:00
|
|
|
}
|