mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Refactor buffers reading from object storage
This commit is contained in:
parent
07fe5d96d6
commit
9eb1dfcd12
@ -452,6 +452,7 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
|
||||
\
|
||||
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
|
||||
M(SynchronousReadWaitMicroseconds, "Time spent in waiting for synchronous reads.") \
|
||||
M(AsynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for asynchronous remote reads.") \
|
||||
M(SynchronousRemoteReadWaitMicroseconds, "Time spent in waiting for synchronous remote reads.") \
|
||||
\
|
||||
|
@ -81,8 +81,7 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
|
||||
return true;
|
||||
}
|
||||
|
||||
std::future<IAsynchronousReader::Result>
|
||||
AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, Priority priority)
|
||||
std::future<IAsynchronousReader::Result> AsynchronousBoundedReadBuffer::readAsync(char * data, size_t size, Priority priority)
|
||||
{
|
||||
IAsynchronousReader::Request request;
|
||||
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
|
||||
@ -94,6 +93,17 @@ AsynchronousBoundedReadBuffer::asyncReadInto(char * data, size_t size, Priority
|
||||
return reader.submit(request);
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data, size_t size)
|
||||
{
|
||||
IAsynchronousReader::Request request;
|
||||
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
|
||||
request.buf = data;
|
||||
request.size = size;
|
||||
request.offset = file_offset_of_buffer_end;
|
||||
request.ignore = bytes_to_ignore;
|
||||
return reader.execute(request);
|
||||
}
|
||||
|
||||
void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
|
||||
{
|
||||
if (prefetch_future.valid())
|
||||
@ -106,7 +116,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
|
||||
last_prefetch_info.priority = priority;
|
||||
|
||||
chassert(prefetch_buffer.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
|
||||
prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
}
|
||||
|
||||
@ -178,53 +188,50 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
|
||||
|
||||
chassert(file_offset_of_buffer_end <= impl->getFileSize());
|
||||
|
||||
size_t size, offset;
|
||||
IAsynchronousReader::Result result;
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
||||
|
||||
auto result = prefetch_future.get();
|
||||
size = result.size;
|
||||
offset = result.offset;
|
||||
result = prefetch_future.get();
|
||||
}
|
||||
|
||||
prefetch_future = {};
|
||||
prefetch_buffer.swap(memory);
|
||||
|
||||
if (read_settings.enable_filesystem_read_prefetches_log)
|
||||
{
|
||||
appendToPrefetchLog(FilesystemPrefetchState::USED, size, result.execution_watch);
|
||||
}
|
||||
last_prefetch_info = {};
|
||||
|
||||
if (read_settings.enable_filesystem_read_prefetches_log)
|
||||
appendToPrefetchLog(FilesystemPrefetchState::USED, result.size, result.execution_watch);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, result.size);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
|
||||
|
||||
chassert(memory.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
|
||||
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
|
||||
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
|
||||
result = readSync(memory.data(), memory.size());
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, result.size);
|
||||
}
|
||||
|
||||
bytes_to_ignore = 0;
|
||||
|
||||
chassert(size >= offset);
|
||||
|
||||
size_t bytes_read = size - offset;
|
||||
size_t bytes_read = result.size - result.offset;
|
||||
if (bytes_read)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
working_buffer = Buffer(memory.data() + result.offset, memory.data() + result.size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
|
||||
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
|
||||
bytes_to_ignore = 0;
|
||||
|
||||
/// In case of multiple files for the same file in clickhouse (i.e. log family)
|
||||
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
|
||||
|
@ -90,7 +90,9 @@ private:
|
||||
int64_t size,
|
||||
const std::unique_ptr<Stopwatch> & execution_watch);
|
||||
|
||||
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);
|
||||
std::future<IAsynchronousReader::Result> readAsync(char * data, size_t size, Priority priority);
|
||||
|
||||
IAsynchronousReader::Result readSync(char * data, size_t size);
|
||||
|
||||
void resetPrefetch(FilesystemPrefetchState state);
|
||||
|
||||
|
@ -15,6 +15,10 @@ namespace Poco { class Logger; }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class Exception;
|
||||
|
||||
@ -76,6 +80,7 @@ public:
|
||||
|
||||
inline bool isSupported() { return is_supported; }
|
||||
std::future<Result> submit(Request request) override;
|
||||
Result execute(Request /* request */) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `execute` not implemented for IOUringReader"); }
|
||||
|
||||
void wait() override {}
|
||||
|
||||
|
@ -116,25 +116,6 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
|
||||
cache_log->add(std::move(elem));
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
|
||||
{
|
||||
/**
|
||||
* Set `data` to current working and internal buffers.
|
||||
* Internal buffer with size `size`. Working buffer with size 0.
|
||||
*/
|
||||
set(data, size);
|
||||
|
||||
file_offset_of_buffer_end = offset;
|
||||
bytes_to_ignore = ignore;
|
||||
|
||||
const auto result = nextImpl();
|
||||
|
||||
if (result)
|
||||
return { working_buffer.size(), BufferBase::offset(), nullptr };
|
||||
|
||||
return {0, 0, nullptr};
|
||||
}
|
||||
|
||||
void ReadBufferFromRemoteFSGather::initialize()
|
||||
{
|
||||
if (blobs_to_read.empty())
|
||||
@ -204,39 +185,14 @@ bool ReadBufferFromRemoteFSGather::readImpl()
|
||||
{
|
||||
SwapHelper swap(*this, *current_buf);
|
||||
|
||||
bool result = false;
|
||||
|
||||
/**
|
||||
* Lazy seek is performed here.
|
||||
* In asynchronous buffer when seeking to offset in range [pos, pos + min_bytes_for_seek]
|
||||
* we save how many bytes need to be ignored (new_offset - position() bytes).
|
||||
*/
|
||||
if (bytes_to_ignore)
|
||||
{
|
||||
current_buf->ignore(bytes_to_ignore);
|
||||
result = current_buf->hasPendingData();
|
||||
file_offset_of_buffer_end += bytes_to_ignore;
|
||||
bytes_to_ignore = 0;
|
||||
}
|
||||
|
||||
if (!result)
|
||||
result = current_buf->next();
|
||||
|
||||
if (blobs_to_read.size() == 1)
|
||||
{
|
||||
file_offset_of_buffer_end = current_buf->getFileOffsetOfBufferEnd();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// For log family engines there are multiple s3 files for the same clickhouse file
|
||||
file_offset_of_buffer_end += current_buf->available();
|
||||
}
|
||||
|
||||
/// Required for non-async reads.
|
||||
bool result = current_buf->next();
|
||||
if (result)
|
||||
{
|
||||
assert(current_buf->available());
|
||||
file_offset_of_buffer_end += current_buf->available();
|
||||
nextimpl_working_buffer_offset = current_buf->offset();
|
||||
|
||||
chassert(current_buf->available());
|
||||
chassert(blobs_to_read.size() != 1 || file_offset_of_buffer_end == current_buf->getFileOffsetOfBufferEnd());
|
||||
}
|
||||
|
||||
return result;
|
||||
@ -256,7 +212,6 @@ void ReadBufferFromRemoteFSGather::reset()
|
||||
current_object = {};
|
||||
current_buf_idx = {};
|
||||
current_buf.reset();
|
||||
bytes_to_ignore = 0;
|
||||
}
|
||||
|
||||
off_t ReadBufferFromRemoteFSGather::seek(off_t offset, int whence)
|
||||
|
@ -40,15 +40,13 @@ public:
|
||||
|
||||
void setReadUntilEnd() override { return setReadUntilPosition(getFileSize()); }
|
||||
|
||||
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;
|
||||
|
||||
size_t getFileSize() override { return getTotalSize(blobs_to_read); }
|
||||
|
||||
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
|
||||
|
||||
off_t seek(off_t offset, int whence) override;
|
||||
|
||||
off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; }
|
||||
off_t getPosition() override { return file_offset_of_buffer_end - available(); }
|
||||
|
||||
bool seekIsCheap() override { return !current_buf; }
|
||||
|
||||
@ -77,7 +75,6 @@ private:
|
||||
|
||||
size_t read_until_position = 0;
|
||||
size_t file_offset_of_buffer_end = 0;
|
||||
size_t bytes_to_ignore = 0;
|
||||
|
||||
StoredObject current_object;
|
||||
size_t current_buf_idx = 0;
|
||||
|
@ -8,6 +8,10 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
/** Perform reads from separate thread pool of specified size.
|
||||
*
|
||||
@ -36,6 +40,8 @@ public:
|
||||
|
||||
std::future<Result> submit(Request request) override;
|
||||
|
||||
Result execute(Request /* request */) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `execute` not implemented for ThreadpoolReader"); }
|
||||
|
||||
void wait() override;
|
||||
|
||||
/// pool automatically waits for all tasks in destructor.
|
||||
|
@ -56,14 +56,10 @@ namespace
|
||||
};
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
|
||||
{
|
||||
return reader.readInto(data, size, offset, ignore);
|
||||
}
|
||||
|
||||
|
||||
ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_)
|
||||
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_))
|
||||
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolRemoteFSReaderThreads,
|
||||
CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive,
|
||||
pool_size, pool_size, queue_size_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -71,23 +67,47 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu
|
||||
std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> elapsed(ProfileEvents::ThreadpoolReaderSubmit);
|
||||
return scheduleFromThreadPool<Result>([request]() -> Result
|
||||
return scheduleFromThreadPool<Result>([request, this]() -> Result { return execute(request); },
|
||||
*pool,
|
||||
"VFSRead",
|
||||
request.priority);
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request)
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
|
||||
|
||||
auto * fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
|
||||
auto & reader = fd->getReader();
|
||||
|
||||
auto read_counters = fd->getReadCounters();
|
||||
std::optional<AsyncReadIncrement> increment = read_counters ? std::optional<AsyncReadIncrement>(read_counters) : std::nullopt;
|
||||
|
||||
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
|
||||
|
||||
reader.set(request.buf, request.size);
|
||||
if (request.offset)
|
||||
reader.seek(request.offset, SEEK_SET);
|
||||
if (request.ignore)
|
||||
reader.ignore(request.ignore);
|
||||
|
||||
bool result = reader.available();
|
||||
if (!result)
|
||||
result = reader.next();
|
||||
|
||||
watch->stop();
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
|
||||
|
||||
IAsynchronousReader::Result read_result;
|
||||
if (result)
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
|
||||
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
|
||||
read_result.size = reader.buffer().size();
|
||||
read_result.offset = reader.offset();
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
|
||||
}
|
||||
|
||||
auto async_read_counters = remote_fs_fd->getReadCounters();
|
||||
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
|
||||
|
||||
auto watch = std::make_unique<Stopwatch>(CLOCK_REALTIME);
|
||||
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
|
||||
watch->stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
|
||||
|
||||
return Result{ .size = result.size, .offset = result.offset, .execution_watch = std::move(watch) };
|
||||
}, *pool, "VFSRead", request.priority);
|
||||
read_result.execution_watch = std::move(watch);
|
||||
return read_result;
|
||||
}
|
||||
|
||||
void ThreadPoolRemoteFSReader::wait()
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/AsynchronousReader.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <Common/ThreadPool_fwd.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
|
||||
@ -16,6 +16,7 @@ public:
|
||||
ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_);
|
||||
|
||||
std::future<IAsynchronousReader::Result> submit(Request request) override;
|
||||
IAsynchronousReader::Result execute(Request request) override;
|
||||
|
||||
void wait() override;
|
||||
|
||||
@ -27,17 +28,17 @@ class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
|
||||
{
|
||||
public:
|
||||
explicit RemoteFSFileDescriptor(
|
||||
ReadBuffer & reader_,
|
||||
SeekableReadBuffer & reader_,
|
||||
std::shared_ptr<AsyncReadCounters> async_read_counters_)
|
||||
: reader(reader_)
|
||||
, async_read_counters(async_read_counters_) {}
|
||||
|
||||
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
|
||||
SeekableReadBuffer & getReader() { return reader; }
|
||||
|
||||
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
|
||||
|
||||
private:
|
||||
ReadBuffer & reader;
|
||||
SeekableReadBuffer & reader;
|
||||
std::shared_ptr<AsyncReadCounters> async_read_counters;
|
||||
};
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
@ -14,6 +15,7 @@
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event AsynchronousReadWaitMicroseconds;
|
||||
extern const Event SynchronousReadWaitMicroseconds;
|
||||
extern const Event LocalReadThrottlerBytes;
|
||||
extern const Event LocalReadThrottlerSleepMicroseconds;
|
||||
}
|
||||
@ -74,68 +76,43 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch(Priority priority)
|
||||
|
||||
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
|
||||
{
|
||||
IAsynchronousReader::Result result;
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
/// Read request already in flight. Wait for its completion.
|
||||
|
||||
size_t size = 0;
|
||||
size_t offset = 0;
|
||||
{
|
||||
Stopwatch watch;
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
||||
auto result = prefetch_future.get();
|
||||
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
|
||||
size = result.size;
|
||||
offset = result.offset;
|
||||
assert(offset < size || size == 0);
|
||||
}
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousReadWaitMicroseconds);
|
||||
|
||||
result = prefetch_future.get();
|
||||
prefetch_future = {};
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
assert(offset <= size);
|
||||
size_t bytes_read = size - offset;
|
||||
if (throttler)
|
||||
throttler->add(bytes_read, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
if (result.size - result.offset > 0)
|
||||
prefetch_buffer.swap(memory);
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// No pending request. Do synchronous read.
|
||||
|
||||
Stopwatch watch;
|
||||
auto [size, offset, _] = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
|
||||
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
file_offset_of_buffer_end += size;
|
||||
|
||||
assert(offset <= size);
|
||||
size_t bytes_read = size - offset;
|
||||
if (throttler)
|
||||
throttler->add(bytes_read, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + offset, memory.data() + size);
|
||||
pos = working_buffer.begin();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousReadWaitMicroseconds);
|
||||
result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
|
||||
}
|
||||
|
||||
chassert(result.size >= result.offset);
|
||||
size_t bytes_read = result.size - result.offset;
|
||||
file_offset_of_buffer_end += result.size;
|
||||
|
||||
if (throttler)
|
||||
throttler->add(result.size, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
/// Adjust the working buffer so that it ignores `offset` bytes.
|
||||
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
|
||||
working_buffer = Buffer(memory.data() + result.offset, memory.data() + result.size);
|
||||
pos = working_buffer.begin();
|
||||
}
|
||||
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
|
||||
|
@ -74,6 +74,7 @@ public:
|
||||
/// or destroy the whole reader before destroying the buffer for request.
|
||||
/// The method can be called concurrently from multiple threads.
|
||||
virtual std::future<Result> submit(Request request) = 0;
|
||||
virtual Result execute(Request request) = 0;
|
||||
|
||||
virtual void wait() = 0;
|
||||
|
||||
|
@ -18,7 +18,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
static constexpr auto DEFAULT_PREFETCH_PRIORITY = Priority{0};
|
||||
@ -236,14 +235,6 @@ public:
|
||||
|
||||
virtual void setReadUntilEnd() {}
|
||||
|
||||
/// Read at most `size` bytes into data at specified offset `offset`. First ignore `ignore` bytes if `ignore` > 0.
|
||||
/// Notice: this function only need to be implemented in synchronous read buffers to be wrapped in asynchronous read.
|
||||
/// Such as ReadBufferFromRemoteFSGather and AsynchronousReadIndirectBufferFromRemoteFS.
|
||||
virtual IAsynchronousReader::Result readInto(char * /*data*/, size_t /*size*/, size_t /*offset*/, size_t /*ignore*/)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "readInto not implemented");
|
||||
}
|
||||
|
||||
protected:
|
||||
/// The number of bytes to ignore from the initial position of `working_buffer`
|
||||
/// buffer. Apparently this is an additional out-parameter for nextImpl(),
|
||||
|
@ -39,51 +39,56 @@ std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request reque
|
||||
/// If size is zero, then read() cannot be distinguished from EOF
|
||||
assert(request.size);
|
||||
|
||||
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
||||
|
||||
#if defined(POSIX_FADV_WILLNEED)
|
||||
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
||||
if (0 != posix_fadvise(fd, request.offset, request.size, POSIX_FADV_WILLNEED))
|
||||
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
|
||||
#endif
|
||||
|
||||
return std::async(std::launch::deferred, [fd, request]
|
||||
return std::async(std::launch::deferred, [request, this]
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
size_t bytes_read = 0;
|
||||
while (!bytes_read)
|
||||
{
|
||||
ssize_t res = 0;
|
||||
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
|
||||
res = ::pread(fd, request.buf, request.size, request.offset);
|
||||
}
|
||||
if (!res)
|
||||
break;
|
||||
|
||||
if (-1 == res && errno != EINTR)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
|
||||
if (res > 0)
|
||||
bytes_read += res;
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
|
||||
|
||||
/// It reports real time spent including the time spent while thread was preempted doing nothing.
|
||||
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
|
||||
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
|
||||
/// (NetlinkMetricsProvider has about 500K RPS).
|
||||
watch.stop();
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
return Result{ .size = bytes_read, .offset = request.ignore };
|
||||
return execute(request);
|
||||
});
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result SynchronousReader::execute(Request request)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
|
||||
size_t bytes_read = 0;
|
||||
while (!bytes_read)
|
||||
{
|
||||
ssize_t res = 0;
|
||||
|
||||
{
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
|
||||
res = ::pread(fd, request.buf, request.size, request.offset);
|
||||
}
|
||||
if (!res)
|
||||
break;
|
||||
|
||||
if (-1 == res && errno != EINTR)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
|
||||
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
|
||||
if (res > 0)
|
||||
bytes_read += res;
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
|
||||
|
||||
/// It reports real time spent including the time spent while thread was preempted doing nothing.
|
||||
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
|
||||
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
|
||||
/// (NetlinkMetricsProvider has about 500K RPS).
|
||||
watch.stop();
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
return Result{ .size = bytes_read, .offset = request.ignore };
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,6 +14,8 @@ class SynchronousReader final : public IAsynchronousReader
|
||||
public:
|
||||
std::future<Result> submit(Request request) override;
|
||||
|
||||
Result execute(Request request) override;
|
||||
|
||||
void wait() override {}
|
||||
};
|
||||
|
||||
|
@ -30,7 +30,7 @@
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/S3/Client.h>
|
||||
|
||||
#include <Disks/IO/ThreadPoolReader.h>
|
||||
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
|
||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||
|
||||
@ -1207,6 +1207,7 @@ TEST_F(WBS3Test, ReadBeyondLastOffset) {
|
||||
wb.finalize();
|
||||
}
|
||||
|
||||
auto reader = std::make_unique<ThreadPoolRemoteFSReader>(1, 1);
|
||||
std::unique_ptr<ReadBufferFromEncryptedFile> encrypted_read_buffer;
|
||||
|
||||
{
|
||||
@ -1214,7 +1215,6 @@ TEST_F(WBS3Test, ReadBeyondLastOffset) {
|
||||
|
||||
auto cache_log = std::shared_ptr<FilesystemCacheLog>();
|
||||
const StoredObjects objects = { StoredObject(remote_file, data.size() + FileEncryption::Header::kSize) };
|
||||
auto reader = std::make_unique<ThreadPoolReader>(1, 1);
|
||||
auto async_read_counters = std::make_shared<AsyncReadCounters>();
|
||||
auto prefetch_log = std::shared_ptr<FilesystemReadPrefetchesLog>();
|
||||
|
||||
@ -1253,7 +1253,7 @@ TEST_F(WBS3Test, ReadBeyondLastOffset) {
|
||||
ASSERT_EQ(rb_async->getPosition(), FileEncryption::Header::kSize);
|
||||
ASSERT_EQ(rb_async->getFileOffsetOfBufferEnd(), disk_read_settings.remote_fs_buffer_size);
|
||||
|
||||
/// ReadBufferFromEncryptedFile is constructed over an ReadBuffer which was already in use.
|
||||
/// ReadBufferFromEncryptedFile is constructed over a ReadBuffer which was already in use.
|
||||
/// The 'FileEncryption::Header' has been read from `rb_async`.
|
||||
/// 'rb_async' will read the data from `rb_async` working buffer
|
||||
encrypted_read_buffer = std::make_unique<ReadBufferFromEncryptedFile>(
|
||||
|
@ -250,20 +250,6 @@ size_t ReadBufferFromHDFS::getFileOffsetOfBufferEnd() const
|
||||
return impl->getPosition();
|
||||
}
|
||||
|
||||
IAsynchronousReader::Result ReadBufferFromHDFS::readInto(char * data, size_t size, size_t offset, size_t /*ignore*/)
|
||||
{
|
||||
/// TODO: we don't need to copy if there is no pending data
|
||||
seek(offset, SEEK_SET);
|
||||
if (eof())
|
||||
return {0, 0, nullptr};
|
||||
|
||||
/// Make sure returned size no greater than available bytes in working_buffer
|
||||
size_t count = std::min(size, available());
|
||||
memcpy(data, position(), count);
|
||||
position() += count;
|
||||
return {count, 0, nullptr};
|
||||
}
|
||||
|
||||
String ReadBufferFromHDFS::getFileName() const
|
||||
{
|
||||
return impl->hdfs_file_path;
|
||||
|
@ -44,8 +44,6 @@ public:
|
||||
|
||||
size_t getFileOffsetOfBufferEnd() const override;
|
||||
|
||||
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore) override;
|
||||
|
||||
String getFileName() const override;
|
||||
|
||||
private:
|
||||
|
Loading…
Reference in New Issue
Block a user