This commit is contained in:
kssenii 2021-10-20 09:37:18 +03:00
parent 143e912e4e
commit 31cd71c849
9 changed files with 29 additions and 69 deletions

View File

@ -253,16 +253,12 @@
M(RemoteFSReadBytes, "Read bytes from remote filesystem.") \
\
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
M(RemoteFSPrefetches, "Total number of prefetches") \
M(RemoteFSSeekCancelledPrefetches, "Number of cancelled prefecthes because of seek") \
M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches pending in buffer destructor") \
M(RemoteFSPrefetchReads, "Total number of reads from prefecthed buffer") \
M(RemoteFSAsyncBufferReads, "Number of nextImpl() calls for async buffer") \
M(RemoteFSSimpleBufferReads, "Number of nextImpl() calls for non-async buffer") \
M(RemoteFSNewReaders, "Number of created impl objects") \
M(RemoteFSAsyncBuffers, "Total number of AsycnhronousReadIndirectBufferFromREmoteFS buffers") \
M(RemoteFSSimpleBuffers, "Total number of ReadIndirectBufferFromREmoteFS buffers") \
M(RemoteFSRedundantlyReadBytes, "") \
M(RemoteFSPrefetches, "Number of prefetches made with asynchronous reading from remote filesystem") \
M(RemoteFSCancelledPrefetches, "Number of cancelled prefecthes (because of seek)") \
M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \
M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \

View File

@ -13,7 +13,6 @@
#include <Compression/CompressionInfo.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Disks/DiskRestartProxy.h>
namespace ProfileEvents

View File

@ -16,11 +16,11 @@ namespace ProfileEvents
extern const Event AsynchronousReadWaitMicroseconds;
extern const Event RemoteFSSeeks;
extern const Event RemoteFSPrefetches;
extern const Event RemoteFSSeekCancelledPrefetches;
extern const Event RemoteFSUnusedCancelledPrefetches;
extern const Event RemoteFSPrefetchReads;
extern const Event RemoteFSAsyncBufferReads;
extern const Event RemoteFSAsyncBuffers;
extern const Event RemoteFSCancelledPrefetches;
extern const Event RemoteFSUnusedPrefetches;
extern const Event RemoteFSPrefetchedReads;
extern const Event RemoteFSUnprefetchedReads;
extern const Event RemoteFSBuffers;
}
namespace DB
@ -46,8 +46,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
, prefetch_buffer(buf_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBuffers);
buffer_events += "Events for buffer: " + impl->getFileName() + " : ";
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -86,33 +85,23 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (absolute_position > last_offset)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {}) {}",
absolute_position, last_offset, buffer_events);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
absolute_position, last_offset);
}
/// Prefetch even in case hasPendingData() == true.
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
buffer_events += fmt::format("-- PREFETCH from offset: {}, upper bound: {} --",
toString(absolute_position), toString(last_offset));
}
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
{
buffer_events += "-- Set last offset " + toString(position) + "--";
if (prefetch_future.valid())
{
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
/// TODO: Planning to put logical error here after more testing,
// because seems like future is never supposed to be valid at this point.
std::terminate();
// buffer_events += "-- Cancelling because of offset update --";
// ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
// prefetch_future.wait();
// prefetch_future = {};
}
last_offset = position;
@ -122,7 +111,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
/// Everything is already read.
if (absolute_position == last_offset)
return false;
@ -131,12 +119,11 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
absolute_position, last_offset);
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBufferReads);
size_t size = 0;
if (prefetch_future.valid())
{
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchReads);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
Stopwatch watch;
@ -151,16 +138,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
}
buffer_events += fmt::format("-- Read from prefetch from offset: {}, upper bound: {}, actually read: {} --",
toString(absolute_position), toString(last_offset), toString(size));
watch.stop();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
else
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
size = readInto(memory.data(), memory.size()).get();
buffer_events += fmt::format("-- Read without prefetch from offset: {}, upper bound: {}, actually read: {} --",
toString(absolute_position), toString(last_offset), toString(size));
if (size)
{
set(memory.data(), memory.size());
@ -170,8 +155,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
prefetch_future = {};
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
return size;
}
@ -179,7 +162,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
buffer_events += "-- Seek to " + toString(offset_) + " --";
if (whence == SEEK_CUR)
{
@ -218,8 +200,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
if (prefetch_future.valid())
{
buffer_events += "-- cancelling prefetch because of seek --";
ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
@ -238,7 +219,6 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
}
else
{
buffer_events += "-- Impl seek --";
impl->seek(absolute_position); /// SEEK_SET.
}
@ -250,12 +230,10 @@ void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
if (prefetch_future.valid())
{
buffer_events += "-- cancelling prefetch in finalize --";
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedCancelledPrefetches);
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedPrefetches);
prefetch_future.wait();
prefetch_future = {};
}
LOG_DEBUG(&Poco::Logger::get("kssenii"), buffer_events);
}

View File

@ -68,8 +68,6 @@ private:
Memory<> prefetch_buffer;
String buffer_events;
size_t min_bytes_for_seek;
size_t bytes_to_ignore = 0;

View File

@ -79,16 +79,15 @@ void ReadBufferFromRemoteFSGather::initialize()
auto current_buf_offset = absolute_position;
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
{
current_buf_idx = i;
const auto & [file_path, size] = metadata.remote_fs_objects[i];
if (size > current_buf_offset)
{
/// Do not create a new buffer if we already have what we need.
if (!current_buf || buf_idx != i)
if (!current_buf || current_buf_idx != i)
{
current_buf = createImplementationBuffer(file_path, last_offset);
buf_idx = i;
current_buf_idx = i;
}
current_buf->seek(current_buf_offset, SEEK_SET);
@ -97,6 +96,7 @@ void ReadBufferFromRemoteFSGather::initialize()
current_buf_offset -= size;
}
current_buf_idx = metadata.remote_fs_objects.size();
current_buf = nullptr;
}
@ -141,7 +141,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
if (bytes_to_ignore)
current_buf->ignore(bytes_to_ignore);
LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", canonical_path);
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Reading from path: {}", canonical_path);
auto result = current_buf->next();
swap(*current_buf);
@ -160,11 +160,11 @@ void ReadBufferFromRemoteFSGather::seek(off_t offset)
}
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t offset)
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
{
assert(last_offset < offset);
assert(last_offset < position);
current_buf.reset();
last_offset = offset;
last_offset = position;
}
@ -177,9 +177,6 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const
{
return canonical_path;
// if (current_buf)
// return fs::path(metadata.metadata_file_path) / metadata.remote_fs_objects[buf_idx].first;
// return metadata.metadata_file_path;
}
}

View File

@ -54,8 +54,6 @@ private:
size_t absolute_position = 0;
size_t buf_idx = 0;
size_t bytes_to_ignore = 0;
size_t last_offset = 0;

View File

@ -3,12 +3,6 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
namespace ProfileEvents
{
extern const Event RemoteFSSimpleBufferReads;
extern const Event RemoteFSSimpleBuffers;
}
namespace DB
{
@ -21,7 +15,6 @@ namespace ErrorCodes
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_) : impl(std::move(impl_))
{
ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBuffers);
}
@ -79,7 +72,6 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
bool ReadIndirectBufferFromRemoteFS::nextImpl()
{
ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBufferReads);
/// Transfer current position and working_buffer to actual ReadBuffer
swap(*impl);
/// Position and working_buffer will be updated in next() call

View File

@ -44,7 +44,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{
auto task = std::make_shared<std::packaged_task<Result()>>([request]
{
setThreadName("ThreadPoolRead");
setThreadName("ThreadPoolRemoteFSRead");
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());

View File

@ -1,3 +1,5 @@
#pragma once
#include <IO/ReadBufferFromFileDecorator.h>
#include <shared_mutex>