mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-19 21:03:51 +00:00
126 lines
3.8 KiB
C++
126 lines
3.8 KiB
C++
#include <IO/LimitSeekableReadBuffer.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
|
}
|
|
|
|
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 start_offset_, UInt64 limit_size_)
|
|
: LimitSeekableReadBuffer(wrapSeekableReadBufferReference(in_), start_offset_, limit_size_)
|
|
{
|
|
}
|
|
|
|
LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 start_offset_, UInt64 limit_size_)
|
|
: SeekableReadBuffer(in_->position(), 0)
|
|
, in(std::move(in_))
|
|
, min_offset(start_offset_)
|
|
, max_offset(start_offset_ + limit_size_)
|
|
, need_seek(min_offset) /// We always start reading from `min_offset`.
|
|
{
|
|
}
|
|
|
|
bool LimitSeekableReadBuffer::nextImpl()
|
|
{
|
|
/// First let the nested buffer know the current position in the buffer (otherwise `in->eof()` or `in->seek()` below can work incorrectly).
|
|
in->position() = position();
|
|
|
|
if (need_seek)
|
|
{
|
|
/// Do actual seek.
|
|
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
|
|
{
|
|
/// Failed to seek, maybe because the new seek position is located after EOF.
|
|
set(in->position(), 0);
|
|
return false;
|
|
}
|
|
need_seek.reset();
|
|
}
|
|
|
|
off_t seek_pos = in->getPosition();
|
|
off_t offset_after_min = seek_pos - min_offset;
|
|
off_t available_before_max = max_offset - seek_pos;
|
|
|
|
if (offset_after_min < 0 || available_before_max <= 0)
|
|
{
|
|
/// Limit reached.
|
|
set(in->position(), 0);
|
|
return false;
|
|
}
|
|
|
|
if (in->eof()) /// `in->eof()` can call `in->next()`
|
|
{
|
|
/// EOF reached.
|
|
set(in->position(), 0);
|
|
return false;
|
|
}
|
|
|
|
/// in->eof() shouldn't change the seek position.
|
|
chassert(seek_pos == in->getPosition());
|
|
|
|
/// Adjust the beginning and the end of the working buffer.
|
|
/// Because we don't want to read before `min_offset` or after `max_offset`.
|
|
auto * ptr = in->position();
|
|
auto * begin = in->buffer().begin();
|
|
auto * end = in->buffer().end();
|
|
|
|
if (ptr - begin > offset_after_min)
|
|
begin = ptr - offset_after_min;
|
|
if (end - ptr > available_before_max)
|
|
end = ptr + available_before_max;
|
|
|
|
BufferBase::set(begin, end - begin, ptr - begin);
|
|
chassert(position() == ptr && available());
|
|
|
|
return true;
|
|
}
|
|
|
|
off_t LimitSeekableReadBuffer::seek(off_t off, int whence)
|
|
{
|
|
off_t new_position;
|
|
off_t current_position = getPosition();
|
|
if (whence == SEEK_SET)
|
|
new_position = off;
|
|
else if (whence == SEEK_CUR)
|
|
new_position = current_position + off;
|
|
else
|
|
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Seek expects SEEK_SET or SEEK_CUR as whence");
|
|
|
|
if (new_position < 0 || new_position + min_offset > max_offset)
|
|
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Seek shift out of bounds");
|
|
|
|
off_t position_change = new_position - current_position;
|
|
if ((buffer().begin() <= pos + position_change) && (pos + position_change <= buffer().end()))
|
|
{
|
|
/// Position is still inside the buffer.
|
|
pos += position_change;
|
|
chassert(pos >= working_buffer.begin());
|
|
chassert(pos <= working_buffer.end());
|
|
return new_position;
|
|
}
|
|
|
|
/// Actual seek in the nested buffer will be performed in nextImpl().
|
|
need_seek = new_position + min_offset;
|
|
|
|
/// Set the size of the working buffer to zero so next call next() would call nextImpl().
|
|
set(in->position(), 0);
|
|
|
|
return new_position;
|
|
}
|
|
|
|
off_t LimitSeekableReadBuffer::getPosition()
|
|
{
|
|
if (need_seek)
|
|
return *need_seek - min_offset;
|
|
|
|
/// We have to do that because `in->getPosition()` below most likely needs to know the current position in the buffer.
|
|
in->position() = position();
|
|
|
|
return in->getPosition() - min_offset;
|
|
}
|
|
|
|
}
|