This commit is contained in:
kssenii 2021-10-15 13:13:38 +03:00
parent 3995506d37
commit c1c574e9ca
4 changed files with 98 additions and 70 deletions

View File

@ -163,7 +163,6 @@ void ReadBufferFromRemoteFSGather::seek(off_t offset)
{
current_buf.reset();
absolute_position = offset;
// initialize();
}

View File

@ -36,13 +36,13 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
Int32 priority_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t buf_size_,
size_t /* min_bytes_for_seek_ */)
size_t min_bytes_for_seek_)
: ReadBufferFromFileBase(buf_size_, nullptr, 0)
, reader(reader_)
, priority(priority_)
, impl(impl_)
, prefetch_buffer(buf_size_)
// , min_bytes_for_seek(min_bytes_for_seek_)
, min_bytes_for_seek(min_bytes_for_seek_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBuffers);
buffer_events += impl->getFileName() + " : ";
@ -69,15 +69,23 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
{
if (hasPendingData())
return;
if (prefetch_future.valid())
return;
if (prefetch_future.valid())
return;
/// Everything is already read.
if (absolute_position == last_offset)
return;
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
buffer_events += "-- Prefetch (" + toString(absolute_position) + ") --";
if (absolute_position > last_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
absolute_position, last_offset);
/// Prefetch even in case hasPendingData() == true.
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
buffer_events += fmt::format("-- PREFETCH from offset: {}, upper bound: {} --",
toString(absolute_position), toString(last_offset));
}
@ -86,10 +94,15 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset)
buffer_events += "-- Set last offset " + toString(offset) + "--";
if (prefetch_future.valid())
{
buffer_events += "-- Cancelling because of offset update --";
ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
std::cerr << buffer_events << std::endl;
/// TODO: Planning to put logical error here after more testing,
// because seems like future is never supposed to be valid at this point.
std::terminate();
// buffer_events += "-- Cancelling because of offset update --";
// ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
// prefetch_future.wait();
// prefetch_future = {};
}
last_offset = offset;
@ -99,6 +112,14 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset)
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
/// Everything is already read.
if (absolute_position == last_offset)
return false;
if (absolute_position > last_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
absolute_position, last_offset);
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBufferReads);
size_t size = 0;
@ -138,6 +159,13 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
prefetch_future = {};
/// TODO: it does not really seem to improve anything to call prefecth() here,
/// but it does not make any worse at the same time.
/// Need to test, it might be useful because in fact sometimes (minority of cases though)
/// we can read without prefetching several times in a row.
prefetch();
return size;
}
@ -192,16 +220,17 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
pos = working_buffer.end();
// if (static_cast<off_t>(absolute_position) >= getPosition()
// && static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
// {
// /**
// * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
// */
// // bytes_to_ignore = absolute_position - getPosition();
// impl->seek(absolute_position); /// SEEK_SET.
// }
// else
/// Note: we read in range [absolute_position, last_offset).
if (absolute_position < last_offset
&& static_cast<off_t>(absolute_position) >= getPosition()
&& static_cast<off_t>(absolute_position) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
{
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
*/
bytes_to_ignore = absolute_position - getPosition();
}
else
{
buffer_events += "-- Impl seek --";
impl->seek(absolute_position); /// SEEK_SET.

View File

@ -70,10 +70,11 @@ private:
String buffer_events;
// size_t min_bytes_for_seek;
size_t min_bytes_for_seek;
size_t bytes_to_ignore = 0;
Int64 last_offset = 0;
size_t last_offset = 0;
};
}

View File

@ -2,15 +2,17 @@
#if USE_AWS_S3
# include <IO/ReadBufferFromIStream.h>
# include <IO/ReadBufferFromS3.h>
# include <Common/Stopwatch.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <Common/Stopwatch.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <base/logger_useful.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
# include <utility>
#include <base/logger_useful.h>
#include <base/sleep.h>
#include <utility>
namespace ProfileEvents
@ -27,6 +29,7 @@ namespace ErrorCodes
extern const int S3_ERROR;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -49,19 +52,29 @@ bool ReadBufferFromS3::nextImpl()
if (last_offset)
{
if (static_cast<off_t>(last_offset) == offset)
{
impl.reset();
working_buffer.resize(0);
return false;
}
if (static_cast<off_t>(last_offset) < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
}
bool next_result = false;
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
if (impl)
{
if (!use_external_buffer)
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/**
* use_external_buffer -- means we read into the buffer which
@ -74,32 +87,25 @@ bool ReadBufferFromS3::nextImpl()
assert(!impl->hasPendingData());
}
}
else
{
/// `impl` is not initialized and we're about to read the first portion of data.
impl = initialize();
next_result = impl->hasPendingData();
}
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
{
Stopwatch watch;
try
{
if (!impl)
{
impl = initialize();
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
}
/// Try to read a next portion of data.
next_result = impl->next();
watch.stop();
@ -119,19 +125,11 @@ bool ReadBufferFromS3::nextImpl()
throw;
/// Pause before next attempt.
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
impl.reset();
impl = initialize();
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
next_result = impl->hasPendingData();
}
}
@ -173,10 +171,11 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
req.SetBucket(bucket);
req.SetKey(key);
// auto right_offset = read_settings.remote_read_right_offset;
if (last_offset)
{
if (offset >= static_cast<off_t>(last_offset))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1));
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1);
}