Finally fixed

This commit is contained in:
kssenii 2021-09-24 15:12:11 +03:00
parent c1ea44b9c2
commit 6219d541a5
4 changed files with 32 additions and 24 deletions

View File

@ -36,14 +36,10 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
{ {
IAsynchronousReader::Request request; IAsynchronousReader::Request request;
impl->set(impl->buffer().begin(), impl->buffer().size());
if (impl->initialized())
{
impl->position() = impl->buffer().end();
assert(!impl->hasPendingData());
}
auto remote_fd = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>(); auto remote_fd = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>();
/// Resize buffer to 0 and move pos to start.
impl->set(impl->buffer().begin(), impl->buffer().size());
remote_fd->impl = impl; remote_fd->impl = impl;
request.descriptor = std::move(remote_fd); request.descriptor = std::move(remote_fd);
@ -76,20 +72,16 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
} }
else else
{ {
// if (impl->initialized())
// {
// impl->position() = position();
// assert(!impl->hasPendingData());
// }
size = readNext().get(); size = readNext().get();
} }
if (size) if (size)
{ {
//set(working_buffer.begin(), working_buffer.size()); size_t offset = pos - working_buffer.begin();
assert(offset >= 0);
swap(*impl); swap(*impl);
// BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
impl->absolute_position += working_buffer.size(); impl->absolute_position += working_buffer.size();
position() = working_buffer.begin() + offset;
} }
prefetch_future = {}; prefetch_future = {};
@ -142,7 +134,6 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
prefetch_future = {}; prefetch_future = {};
} }
// impl->seek(impl->absolute_position, SEEK_SET);
pos = working_buffer.end(); pos = working_buffer.end();
impl->reset(); impl->reset();

View File

@ -137,18 +137,20 @@ public:
const String & bucket_, const String & bucket_,
DiskS3::Metadata metadata_, DiskS3::Metadata metadata_,
size_t max_single_read_retries_, size_t max_single_read_retries_,
size_t buf_size_) size_t buf_size_,
bool threadpool_read_ = false)
: ReadBufferFromRemoteFS(metadata_) : ReadBufferFromRemoteFS(metadata_)
, client_ptr(std::move(client_ptr_)) , client_ptr(std::move(client_ptr_))
, bucket(bucket_) , bucket(bucket_)
, max_single_read_retries(max_single_read_retries_) , max_single_read_retries(max_single_read_retries_)
, buf_size(buf_size_) , buf_size(buf_size_)
, threadpool_read(threadpool_read_)
{ {
} }
SeekableReadBufferPtr createReadBuffer(const String & path) const override SeekableReadBufferPtr createReadBuffer(const String & path) const override
{ {
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, buf_size); return std::make_unique<ReadBufferFromS3>(client_ptr, bucket, fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, buf_size, threadpool_read);
} }
private: private:
@ -156,6 +158,7 @@ private:
const String & bucket; const String & bucket;
UInt64 max_single_read_retries; UInt64 max_single_read_retries;
size_t buf_size; size_t buf_size;
bool threadpool_read;
}; };
DiskS3::DiskS3( DiskS3::DiskS3(
@ -231,11 +234,13 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
LOG_TRACE(log, "Read from file by path: {}. Existing S3 objects: {}", LOG_TRACE(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size()); backQuote(metadata_path + path), metadata.remote_fs_objects.size());
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto s3_impl = std::make_unique<ReadIndirectBufferFromS3>( auto s3_impl = std::make_unique<ReadIndirectBufferFromS3>(
settings->client, bucket, metadata, settings->client, bucket, metadata,
settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size); settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size, threadpool_read);
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool) if (threadpool_read)
{ {
auto reader = getThreadPoolReader(); auto reader = getThreadPoolReader();
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(s3_impl)); auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(s3_impl));

View File

@ -31,13 +31,15 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3( ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 max_single_read_retries_, size_t buffer_size_) std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_,
UInt64 max_single_read_retries_, size_t buffer_size_, bool use_external_buffer_)
: SeekableReadBuffer(nullptr, 0) : SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_)) , client_ptr(std::move(client_ptr_))
, bucket(bucket_) , bucket(bucket_)
, key(key_) , key(key_)
, max_single_read_retries(max_single_read_retries_) , max_single_read_retries(max_single_read_retries_)
, buffer_size(buffer_size_) , buffer_size(buffer_size_)
, use_external_buffer(use_external_buffer_)
{ {
} }
@ -45,11 +47,18 @@ bool ReadBufferFromS3::nextImpl()
{ {
bool next_result = false; bool next_result = false;
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
if (impl) if (impl)
{ {
/// `impl` has been initialized earlier and now we're at the end of the current portion of data. if (use_external_buffer)
// impl->position() = position(); {
impl->set(working_buffer.begin(), working_buffer.size()); impl->set(working_buffer.begin(), working_buffer.size());
}
else
{
impl->position() = position();
}
assert(!impl->hasPendingData()); assert(!impl->hasPendingData());
} }
else else
@ -94,6 +103,7 @@ bool ReadBufferFromS3::nextImpl()
} }
} }
std::cerr << "s3 buffer size: " << impl->buffer().size() << std::endl;
if (!next_result) if (!next_result)
return false; return false;

View File

@ -43,7 +43,8 @@ public:
const String & bucket_, const String & bucket_,
const String & key_, const String & key_,
UInt64 max_single_read_retries_, UInt64 max_single_read_retries_,
size_t buffer_size_); size_t buffer_size_,
bool use_external_buffer = false);
bool nextImpl() override; bool nextImpl() override;
@ -52,6 +53,7 @@ public:
private: private:
std::unique_ptr<ReadBuffer> initialize(); std::unique_ptr<ReadBuffer> initialize();
bool use_external_buffer;
}; };
} }