This commit is contained in:
kssenii 2021-10-21 13:01:04 +03:00
parent e4e157688d
commit 120b01e089
4 changed files with 18 additions and 32 deletions

View File

@ -56,31 +56,20 @@ String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
}
size_t AsynchronousReadIndirectBufferFromRemoteFS::getNumBytesToRead()
bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
{
size_t num_bytes_to_read;
/// Position is set only for MergeTree tables.
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
return 0;
if (file_offset_of_buffer_end == read_until_position)
return false;
if (file_offset_of_buffer_end > *read_until_position)
if (file_offset_of_buffer_end > read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, *read_until_position);
/// Read range [file_offset_of_buffer_end, read_until_position).
num_bytes_to_read = *read_until_position - file_offset_of_buffer_end;
num_bytes_to_read = std::min(num_bytes_to_read, internal_buffer.size());
file_offset_of_buffer_end, read_until_position);
}
else
{
num_bytes_to_read = internal_buffer.size();
}
return num_bytes_to_read;
return true;
}
@ -107,12 +96,10 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (prefetch_future.valid())
return;
auto num_bytes_to_read = getNumBytesToRead();
if (!num_bytes_to_read)
if (!hasPendingDataToRead())
return;
/// Prefetch even in case hasPendingData() == true.
prefetch_buffer.resize(num_bytes_to_read);
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -130,6 +117,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
if (!hasPendingDataToRead())
return false;
size_t size = 0;
if (prefetch_future.valid())
@ -154,12 +144,8 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
else
{
auto num_bytes_to_read = getNumBytesToRead();
if (!num_bytes_to_read) /// Nothing to read.
return false;
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
size = readInto(memory.data(), num_bytes_to_read).get();
size = readInto(memory.data(), memory.size()).get();
if (size)
{

View File

@ -54,7 +54,7 @@ private:
void finalize();
size_t getNumBytesToRead();
bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
@ -74,7 +74,7 @@ private:
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position = 0;
size_t read_until_position = 0;
};
}

View File

@ -57,15 +57,15 @@ size_t AsynchronousReadBufferFromFileDescriptor::getNumBytesToRead()
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
if (file_offset_of_buffer_end == read_until_position)
return 0;
if (file_offset_of_buffer_end > *read_until_position)
if (file_offset_of_buffer_end > read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, *read_until_position);
file_offset_of_buffer_end, read_until_position);
/// Read range [file_offset_of_buffer_end, read_until_position).
num_bytes_to_read = *read_until_position - file_offset_of_buffer_end;
num_bytes_to_read = read_until_position - file_offset_of_buffer_end;
num_bytes_to_read = std::min(num_bytes_to_read, internal_buffer.size());
}
else

View File

@ -24,7 +24,7 @@ protected:
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
std::optional<size_t> read_until_position;
size_t read_until_position = 0;
int fd;
bool nextImpl() override;