Make async reader work with any impl

This commit is contained in:
kssenii 2023-05-22 19:51:58 +02:00
parent d705e5102b
commit c4d862a16f
31 changed files with 230 additions and 297 deletions

View File

@ -1,6 +1,7 @@
#include <Backups/BackupEntryFromAppendOnlyFile.h>
#include <Disks/IDisk.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB

View File

@ -1,5 +1,7 @@
#include <Backups/BackupEntryFromImmutableFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Disks/IDisk.h>
#include <city.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Disks/IDisk.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>

View File

@ -15,7 +15,7 @@
#include <IO/Archives/createArchiveWriter.h>
#include <IO/ConcatSeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/logger_useful.h>

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h>
#include <Core/Defines.h>
#include <Core/Names.h>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
@ -20,6 +21,7 @@
#include <boost/noncopyable.hpp>
#include <Poco/Timestamp.h>
#include <filesystem>
#include <sys/stat.h>
namespace fs = std::filesystem;

View File

@ -1,4 +1,4 @@
#include "AsynchronousReadIndirectBufferFromRemoteFS.h"
#include "AsynchronousBoundedReadBuffer.h"
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
@ -43,105 +43,77 @@ namespace ErrorCodes
}
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
#ifndef NDEBUG
, log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS"))
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
String AsynchronousReadIndirectBufferFromRemoteFS::getFileName() const
bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
{
return impl->getFileName();
}
String AsynchronousReadIndirectBufferFromRemoteFS::getInfoForLog()
{
return impl->getInfoForLog();
}
size_t AsynchronousReadIndirectBufferFromRemoteFS::getFileSize()
{
return impl->getFileSize();
}
bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
{
/**
* Note: read_until_position here can be std::nullopt only for non-MergeTree tables.
* For mergeTree tables it must be guaranteed that setReadUntilPosition() or
* setReadUntilEnd() is called before any read or prefetch.
* setReadUntilEnd() always sets read_until_position to file size.
* setReadUntilPosition(pos) always has pos > 0, because if
* right_offset_in_compressed_file is 0, then setReadUntilEnd() is used.
*/
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
if (file_offset_of_buffer_end == *read_until_position) /// Everything is already read.
return false;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
}
return true;
}
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
std::future<IAsynchronousReader::Result>
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
request.priority = read_settings.priority + priority;
request.ignore = bytes_to_ignore;
return reader.submit(request);
}
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
void AsynchronousBoundedReadBuffer::prefetch(int64_t priority)
{
if (prefetch_future.valid())
return;
/// Check boundary, which was set in readUntilPosition().
if (!hasPendingDataToRead())
return;
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size || prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size
|| prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
{
if (!read_until_position || position != *read_until_position)
{
@ -157,21 +129,16 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch)
{
setReadUntilPosition(impl->getFileSize());
}
void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch)
{
const auto & object = impl->getCurrentObject();
FilesystemReadPrefetchesLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.local_path,
.path = impl->getFileName(),
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
@ -187,7 +154,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
bool AsynchronousBoundedReadBuffer::nextImpl()
{
if (!hasPendingDataToRead())
return false;
@ -245,14 +212,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
off_t AsynchronousBoundedReadBuffer::seek(off_t offset, int whence)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
@ -268,7 +235,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Expected SEEK_SET or SEEK_CUR as whence");
}
/// Position is unchanged.
@ -322,9 +289,8 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (read_until_position && new_pos > *read_until_position)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
impl->seek(file_offset_of_buffer_end, SEEK_SET);
return new_pos;
}
@ -332,8 +298,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
*/
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
if (read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
@ -342,31 +307,21 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else
{
if (impl->initialized())
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
}
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
file_offset_of_buffer_end = new_pos;
impl->seek(file_offset_of_buffer_end, SEEK_SET);
}
return new_pos;
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available() + bytes_to_ignore;
}
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
void AsynchronousBoundedReadBuffer::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
}
AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromRemoteFS()
AsynchronousBoundedReadBuffer::~AsynchronousBoundedReadBuffer()
{
try
{
@ -378,7 +333,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::~AsynchronousReadIndirectBufferFromR
}
}
void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetchState state)
void AsynchronousBoundedReadBuffer::resetPrefetch(FilesystemPrefetchState state)
{
if (!prefetch_future.valid())
return;

View File

@ -0,0 +1,96 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
class ReadBufferFromRemoteFSGather;
class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
{
public:
using Impl = ReadBufferFromFileBase;
using ImplPtr = std::unique_ptr<Impl>;
explicit AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_ = nullptr,
FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr);
~AsynchronousBoundedReadBuffer() override;
String getFileName() const override { return impl->getFileName(); }
size_t getFileSize() override { return impl->getFileSize(); }
String getInfoForLog() override { return impl->getInfoForLog(); }
off_t seek(off_t offset_, int whence) override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
const ImplPtr impl;
const ReadSettings read_settings;
IAsynchronousReader & reader;
size_t file_offset_of_buffer_end = 0;
std::optional<size_t> read_until_position;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const std::string query_id;
const std::string current_reader_id;
Poco::Logger * log;
AsyncReadCountersPtr async_read_counters;
FilesystemReadPrefetchesLogPtr prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
};
}

View File

@ -1,111 +0,0 @@
#pragma once
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
* Reads data from S3/HDFS/Web using stored paths in metadata.
* This class is an asynchronous version of ReadIndirectBufferFromRemoteFS.
*
* Buffers chain for diskS3:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadBufferFromS3 -> ReadBufferFromIStream.
*
* Buffers chain for diskWeb:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadIndirectBufferFromWebServer -> ReadBufferFromHTTP -> ReadBufferFromIStream.
*
* We pass either `memory` or `prefetch_buffer` through all this chain and return it back.
*/
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override;
String getFileName() const override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
void setReadUntilEnd() override;
String getInfoForLog() override;
size_t getFileSize() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
private:
bool nextImpl() override;
void finalize();
bool hasPendingDataToRead();
void appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings;
IAsynchronousReader & reader;
int64_t base_priority;
std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
std::future<IAsynchronousReader::Result> prefetch_future;
size_t file_offset_of_buffer_end = 0;
Memory<> prefetch_buffer;
std::string query_id;
std::string current_reader_id;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position;
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
};
}

View File

@ -12,22 +12,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
: ReadBufferFromFileBase(0, nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
@ -38,9 +40,9 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache)
if (current_buf && !with_cache)
{
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
current_object = object;
@ -70,7 +72,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
return current_read_buffer_creator();
}
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
{
if (!cache_log || current_object.remote_path.empty())
return;
@ -218,44 +220,23 @@ bool ReadBufferFromRemoteFSGather::readImpl()
return result;
}
size_t ReadBufferFromRemoteFSGather::getFileOffsetOfBufferEnd() const
{
return file_offset_of_buffer_end;
}
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
{
if (position != read_until_position)
{
read_until_position = position;
reset();
}
}
if (position == read_until_position)
return;
void ReadBufferFromRemoteFSGather::reset()
{
read_until_position = position;
current_buf.reset();
}
String ReadBufferFromRemoteFSGather::getFileName() const
off_t ReadBufferFromRemoteFSGather::seek(off_t offset, int whence)
{
return current_object.remote_path;
}
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only seeking with SEEK_SET is allowed");
size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;
for (const auto & object : blobs_to_read)
size += object.bytes_size;
return size;
}
String ReadBufferFromRemoteFSGather::getInfoForLog()
{
if (!current_buf)
return "";
return current_buf->getInfoForLog();
file_offset_of_buffer_end = offset;
current_buf.reset();
return file_offset_of_buffer_end;
}
size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
@ -269,7 +250,7 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache)
appendFilesystemCacheLog();
appendUncachedReadInfo();
}
}

View File

@ -10,12 +10,13 @@ namespace Poco { class Logger; }
namespace DB
{
class FilesystemCacheLog;
/**
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
* This class works like a proxy to allow transition from one file into multiple.
*/
class ReadBufferFromRemoteFSGather final : public ReadBuffer
class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
{
friend class ReadIndirectBufferFromRemoteFS;
@ -30,25 +31,25 @@ public:
~ReadBufferFromRemoteFSGather() override;
String getFileName() const;
String getFileName() const override { return current_object.remote_path; }
void reset();
String getInfoForLog() override { return current_buf ? current_buf->getInfoForLog() : ""; }
void setReadUntilPosition(size_t position) override;
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;
size_t getFileSize() const;
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
size_t getFileOffsetOfBufferEnd() const;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
bool initialized() const { return current_buf != nullptr; }
String getInfoForLog();
size_t getImplementationBufferOffset() const;
const StoredObject & getCurrentObject() const { return current_object; }
off_t seek(off_t offset, int whence) override;
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
@ -61,40 +62,25 @@ private:
bool moveToNextBuffer();
void appendFilesystemCacheLog();
ReadBufferCreator read_buffer_creator;
StoredObjects blobs_to_read;
ReadSettings settings;
size_t read_until_position = 0;
StoredObject current_object;
void appendUncachedReadInfo();
const ReadSettings settings;
const StoredObjects blobs_to_read;
const ReadBufferCreator read_buffer_creator;
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
bool with_cache;
String query_id;
Poco::Logger * log;
SeekableReadBufferPtr current_buf;
size_t current_buf_idx = 0;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;
/**
* File: |___________________|
* Buffer: |~~~~~~~|
* file_offset_of_buffer_end: ^
*/
size_t bytes_to_ignore = 0;
size_t total_bytes_read_from_current_file = 0;
std::shared_ptr<FilesystemCacheLog> cache_log;
StoredObject current_object;
size_t current_buf_idx = 0;
SeekableReadBufferPtr current_buf;
Poco::Logger * log;
};
}

View File

@ -82,8 +82,8 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
else
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET or SEEK_CUR modes are allowed.");
impl->reset();
resetWorkingBuffer();
impl->seek(file_offset_of_buffer_end, SEEK_SET);
file_offset_of_buffer_end = impl->file_offset_of_buffer_end;
return impl->file_offset_of_buffer_end;

View File

@ -31,8 +31,6 @@ public:
void setReadUntilEnd() override;
bool isIntegratedWithFilesystemCache() const override { return true; }
size_t getFileSize() override;
private:

View File

@ -8,6 +8,7 @@
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
@ -112,8 +113,8 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(reader_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -5,7 +5,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>

View File

@ -2,6 +2,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <base/getFQDNOrHostName.h>
#include <future>
namespace DB
{

View File

@ -7,7 +7,6 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>

View File

@ -3,6 +3,7 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>

View File

@ -12,12 +12,14 @@
#include <Common/Exception.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <IO/copyData.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool_fwd.h>
#include <Disks/WriteMode.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
namespace DB

View File

@ -8,6 +8,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -64,12 +65,12 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
/// AsynchronousBoundedReadBuffer which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, modified_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -6,7 +6,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -128,8 +128,8 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), reader, disk_read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -0,0 +1,14 @@
#include <Disks/ObjectStorages/StoredObject.h>
namespace DB
{
size_t getTotalSize(const StoredObjects & objects)
{
size_t size = 0;
for (const auto & object : objects)
size += object.bytes_size;
return size;
}
}

View File

@ -29,4 +29,6 @@ struct StoredObject
using StoredObjects = std::vector<StoredObject>;
size_t getTotalSize(const StoredObjects & objects);
}

View File

@ -9,6 +9,7 @@
#include <IO/WriteHelpers.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
@ -190,8 +191,8 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(web_impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}

View File

@ -27,5 +27,6 @@ struct AsyncReadCounters
void dumpToMapColumn(IColumn * column) const;
};
using AsyncReadCountersPtr = std::shared_ptr<AsyncReadCounters>;
}

View File

@ -27,8 +27,6 @@ public:
ReadBuffer & getWrappedReadBuffer() { return *impl; }
bool isIntegratedWithFilesystemCache() const override { return impl->isIntegratedWithFilesystemCache(); }
size_t getFileSize() override;
protected:

View File

@ -49,8 +49,6 @@ public:
/// If true, setReadUntilPosition() guarantees that eof will be reported at the given position.
virtual bool supportsRightBoundedReads() const { return false; }
virtual bool isIntegratedWithFilesystemCache() const { return false; }
/// Returns true if seek() actually works, false if seek() will always throw (or make subsequent
/// nextImpl() calls throw).
///

View File

@ -45,4 +45,6 @@ public:
using SystemLog<FilesystemReadPrefetchesLogElement>::SystemLog;
};
using FilesystemReadPrefetchesLogPtr = std::shared_ptr<FilesystemReadPrefetchesLog>;
}

View File

@ -5,6 +5,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/WriteMode.h>
#include <Disks/IDisk.h>

View File

@ -31,7 +31,7 @@
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ReadFromStorageProgress.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/StoredObject.h>
@ -676,8 +676,8 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
pool_reader, modified_settings, std::move(s3_impl),
auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), pool_reader, modified_settings,
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
async_reader->setReadUntilEnd();

View File

@ -10,6 +10,7 @@
#include <Common/formatReadable.h>
#include <Common/StringUtils/StringUtils.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/logger_useful.h>
#include <Interpreters/Set.h>
#include <Processors/Sinks/SinkToStorage.h>