mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Fix error "Read beyond last offset" for AsynchronousBoundedReadBuffer.
This commit is contained in:
parent
f663f13a35
commit
eb3836ac5a
@ -69,12 +69,7 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
|
||||
return false;
|
||||
|
||||
if (file_offset_of_buffer_end > *read_until_position)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Read beyond last offset ({} > {}, info: {})",
|
||||
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
|
||||
}
|
||||
throwReadBeyondLastOffset();
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -103,6 +98,18 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data,
|
||||
return reader.execute(request);
|
||||
}
|
||||
|
||||
size_t AsynchronousBoundedReadBuffer::getBufferSizeForReading() const
|
||||
{
|
||||
size_t buffer_size = chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize());
|
||||
if (read_until_position)
|
||||
{
|
||||
if (file_offset_of_buffer_end > *read_until_position)
|
||||
throwReadBeyondLastOffset();
|
||||
buffer_size = std::min(buffer_size, *read_until_position - file_offset_of_buffer_end);
|
||||
}
|
||||
return buffer_size;
|
||||
}
|
||||
|
||||
void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
@ -114,7 +121,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
|
||||
last_prefetch_info.submit_time = std::chrono::system_clock::now();
|
||||
last_prefetch_info.priority = priority;
|
||||
|
||||
prefetch_buffer.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
prefetch_buffer.resize(getBufferSizeForReading());
|
||||
prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
}
|
||||
@ -126,14 +133,15 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
|
||||
if (position < file_offset_of_buffer_end)
|
||||
{
|
||||
/// file has been read beyond new read until position already
|
||||
if (working_buffer.size() >= file_offset_of_buffer_end - position)
|
||||
if (available() >= file_offset_of_buffer_end - position)
|
||||
{
|
||||
/// new read until position is inside working buffer
|
||||
/// new read until position is after the current position in the working buffer
|
||||
file_offset_of_buffer_end = position;
|
||||
working_buffer.resize(working_buffer.size() - (file_offset_of_buffer_end - position));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// new read until position is before working buffer begin
|
||||
/// new read until position is before the current position in the working buffer
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to set read until position before already read data ({} > {}, info: {})",
|
||||
@ -155,6 +163,16 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
|
||||
}
|
||||
}
|
||||
|
||||
void AsynchronousBoundedReadBuffer::throwReadBeyondLastOffset() const
|
||||
{
|
||||
size_t file_size = impl->getFileSize();
|
||||
size_t read_end_position = read_until_position ? *read_until_position : file_size;
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Read beyond last offset ({} > {}): file size = {}, info: {}",
|
||||
file_offset_of_buffer_end, read_end_position, file_size, impl->getInfoForLog());
|
||||
}
|
||||
|
||||
void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
|
||||
FilesystemPrefetchState state,
|
||||
int64_t size,
|
||||
@ -210,7 +228,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
memory.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
memory.resize(getBufferSizeForReading());
|
||||
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
|
||||
@ -238,6 +256,9 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
|
||||
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
|
||||
chassert(file_offset_of_buffer_end <= impl->getFileSize());
|
||||
|
||||
if (read_until_position && (file_offset_of_buffer_end > *read_until_position))
|
||||
throwReadBeyondLastOffset();
|
||||
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
|
@ -94,8 +94,11 @@ private:
|
||||
|
||||
IAsynchronousReader::Result readSync(char * data, size_t size);
|
||||
|
||||
size_t getBufferSizeForReading() const;
|
||||
|
||||
void resetPrefetch(FilesystemPrefetchState state);
|
||||
|
||||
[[noreturn]] void throwReadBeyondLastOffset() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user