Fix DiskS3 read error due to broken getPosition() method in ReadBufferFromS3.cpp

This commit is contained in:
Pavel Kovalenko 2021-06-02 18:03:25 +03:00
parent c673eb2040
commit 2bd12a7612
6 changed files with 43 additions and 30 deletions

View File

@ -87,15 +87,14 @@ bool ReadIndirectBufferFromRemoteFS<T>::nextImpl()
{
/// Find first available buffer that fits to given offset.
if (!current_buf)
{
current_buf = initialize();
pos = working_buffer.begin();
}
/// If current buffer has remaining data - use it.
if (current_buf && current_buf->next())
{
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
}
if (current_buf)
return nextAndShiftPosition();
/// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= metadata.remote_fs_objects.size())
@ -105,12 +104,21 @@ bool ReadIndirectBufferFromRemoteFS<T>::nextImpl()
const auto & path = metadata.remote_fs_objects[current_buf_idx].first;
current_buf = createReadBuffer(path);
current_buf->next();
working_buffer = current_buf->buffer();
return nextAndShiftPosition();
}
template <typename T>
bool ReadIndirectBufferFromRemoteFS<T>::nextAndShiftPosition()
{
swap(*current_buf);
auto result = current_buf->next();
swap(*current_buf);
if (result)
absolute_position += working_buffer.size();
return true;
return result;
}

View File

@ -16,7 +16,7 @@ template <typename T>
class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_);
explicit ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_);
off_t seek(off_t offset_, int whence) override;
@ -32,6 +32,8 @@ protected:
private:
std::unique_ptr<T> initialize();
bool nextAndShiftPosition();
bool nextImpl() override;
size_t absolute_position = 0;

View File

@ -320,6 +320,8 @@ void DiskS3::startup()
{
auto settings = current_settings.get();
settings->client->EnableRequestProcessing();
if (!settings->send_metadata)
return;

View File

@ -4,7 +4,7 @@
namespace DB
{
ReadBufferFromFileDecorator::ReadBufferFromFileDecorator(std::unique_ptr<ReadBufferFromFileBase> impl_)
ReadBufferFromFileDecorator::ReadBufferFromFileDecorator(std::unique_ptr<SeekableReadBuffer> impl_)
: impl(std::move(impl_))
{
swap(*impl);
@ -13,7 +13,9 @@ ReadBufferFromFileDecorator::ReadBufferFromFileDecorator(std::unique_ptr<ReadBuf
std::string ReadBufferFromFileDecorator::getFileName() const
{
return impl->getFileName();
if (ReadBufferFromFileBase * buffer = dynamic_cast<ReadBufferFromFileBase*>(impl.get()))
return buffer->getFileName();
return std::string();
}

View File

@ -10,7 +10,7 @@ namespace DB
class ReadBufferFromFileDecorator : public ReadBufferFromFileBase
{
public:
explicit ReadBufferFromFileDecorator(std::unique_ptr<ReadBufferFromFileBase> impl_);
explicit ReadBufferFromFileDecorator(std::unique_ptr<SeekableReadBuffer> impl_);
std::string getFileName() const override;
@ -21,7 +21,7 @@ public:
bool nextImpl() override;
protected:
std::unique_ptr<ReadBufferFromFileBase> impl;
std::unique_ptr<SeekableReadBuffer> impl;
};
}

View File

@ -43,12 +43,6 @@ ReadBufferFromS3::ReadBufferFromS3(
bool ReadBufferFromS3::nextImpl()
{
/// Restoring valid value of `count()` during `nextImpl()`. See `ReadBuffer::next()`.
pos = working_buffer.begin();
if (!impl)
impl = initialize();
Stopwatch watch;
bool next_result = false;
@ -84,25 +78,31 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
if (!next_result)
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer = impl->buffer();
pos = working_buffer.begin();
ProfileEvents::increment(ProfileEvents::S3ReadBytes, internal_buffer.size());
working_buffer = internal_buffer;
offset += working_buffer.size();
return true;
}
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (impl)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (impl)
{
impl.reset();
pos = working_buffer.end();
}
offset = offset_;
return offset;
@ -110,18 +110,17 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
off_t ReadBufferFromS3::getPosition()
{
return offset + count();
return offset - available();
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, getPosition());
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset);
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (getPosition())
req.SetRange("bytes=" + std::to_string(getPosition()) + "-");
req.SetRange(fmt::format("bytes={}-", offset));
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);