Merge pull request #31112 from kssenii/fix-async-reads

Fix threadpool read for remote disks
This commit is contained in:
Kseniia Sumarokova 2021-11-08 22:10:30 +03:00 committed by GitHub
commit 1167e5aae8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 12 deletions

View File

@ -24,6 +24,8 @@ public:
void setReadUntilPosition(size_t position) override { impl->setReadUntilPosition(position); }
void setReadUntilEnd() override { impl->setReadUntilEnd(); }
private:
ReadLock lock;
};

View File

@ -59,16 +59,23 @@ String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
{
/// Position is set only for MergeTree tables.
/**
* Note: read_until_position here can be zero only for non-MergeTree tables.
* For mergeTree tables it must be guaranteed that setReadUntilPosition() or
* setReadUntilEnd() is called before any read or prefetch.
* setReadUntilEnd() always sets read_until_position to file size.
* setReadUntilPosition(pos) always has pos > 0, because if
* right_offset_in_compressed_file is 0, then setReadUntilEnd() is used.
*/
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == read_until_position)
if (file_offset_of_buffer_end == *read_until_position)
return false;
if (file_offset_of_buffer_end > read_until_position)
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, read_until_position);
file_offset_of_buffer_end, *read_until_position);
}
else if (must_read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR,
@ -117,7 +124,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition");
read_until_position = position;
impl->setReadUntilPosition(read_until_position);
impl->setReadUntilPosition(*read_until_position);
}
@ -127,7 +134,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilEnd");
read_until_position = impl->getFileSize();
impl->setReadUntilPosition(read_until_position);
impl->setReadUntilPosition(*read_until_position);
}
@ -225,7 +232,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
pos = working_buffer.end();
/// Note: we read in range [file_offset_of_buffer_end, read_until_position).
if (file_offset_of_buffer_end < read_until_position
if (read_until_position && file_offset_of_buffer_end < *read_until_position
&& static_cast<off_t>(file_offset_of_buffer_end) >= getPosition()
&& static_cast<off_t>(file_offset_of_buffer_end) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
{

View File

@ -76,7 +76,7 @@ private:
size_t bytes_to_ignore = 0;
size_t read_until_position = 0;
std::optional<size_t> read_until_position = 0;
bool must_read_until_position;
};

View File

@ -192,21 +192,33 @@ void MergeTreeReaderStream::seekToStart()
void MergeTreeReaderStream::adjustForRange(MarkRange range)
{
/**
* Note: this method is called multiple times for the same range of marks -- each time we
* read from stream, but we must update last_right_offset only if it is bigger than
* the last one to avoid redundantly cancelling prefetches.
*/
auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(range.begin, range.end);
if (!right_offset)
{
if (last_right_offset && *last_right_offset == 0)
return;
last_right_offset = 0; // Zero value means the end of file.
if (cached_buffer)
cached_buffer->setReadUntilEnd();
if (non_cached_buffer)
non_cached_buffer->setReadUntilEnd();
}
else if (right_offset > last_right_offset)
else
{
if (last_right_offset && right_offset <= last_right_offset.value())
return;
last_right_offset = right_offset;
if (cached_buffer)
cached_buffer->setReadUntilPosition(last_right_offset);
cached_buffer->setReadUntilPosition(right_offset);
if (non_cached_buffer)
non_cached_buffer->setReadUntilPosition(last_right_offset);
non_cached_buffer->setReadUntilPosition(right_offset);
}
}

View File

@ -51,7 +51,7 @@ private:
MarkCache * mark_cache;
bool save_marks_in_cache;
size_t last_right_offset = 0;
std::optional<size_t> last_right_offset;
const MergeTreeIndexGranularityInfo * index_granularity_info;