diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index cba194fe8d6..41ae41b38d4 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -252,6 +252,17 @@ M(RemoteFSReadMicroseconds, "Time of reading from remote filesystem.") \ M(RemoteFSReadBytes, "Read bytes from remote filesystem.") \ \ + M(RemoteFSSeeks, "Total number of seeks for async buffer") \ + M(RemoteFSPrefetches, "Total number of prefetches") \ + M(RemoteFSSeekCancelledPrefetches, "Number of cancelled prefecthes because of seek") \ + M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches prending in buffer desctructor") \ + M(RemoteFSPrefetchReads, "Total number of reads from prefecthed buffer") \ + M(RemoteFSAsyncBufferReads, "Number of nextImpl() calls for async buffer") \ + M(RemoteFSSimpleBufferReads, "Number of nextImpl() calls for non-async buffer") \ + M(RemoteFSNewReaders, "Number of created impl objects") \ + M(RemoteFSAsyncBuffers, "Total number of AsycnhronousReadIndirectBufferFromREmoteFS buffers") \ + M(RemoteFSSimpleBuffers, "Total number of ReadIndirectBufferFromREmoteFS buffers") \ + \ M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \ M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \ \ diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index f25707b70a0..0327223dbb9 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -8,9 +8,17 @@ namespace CurrentMetrics { extern const Metric AsynchronousReadWait; } + namespace ProfileEvents { extern const Event AsynchronousReadWaitMicroseconds; + extern const Event RemoteFSSeeks; + extern const Event RemoteFSPrefetches; + extern const Event RemoteFSSeekCancelledPrefetches; + extern const Event RemoteFSUnusedCancelledPrefetches; + extern const Event RemoteFSPrefetchReads; + extern const Event RemoteFSAsyncBufferReads; + extern const Event RemoteFSAsyncBuffers; } namespace DB @@ -52,15 +60,21 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() return; prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); + buffer_events += "-- Prefetch --"; } bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() { + ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBufferReads); size_t size = 0; if (prefetch_future.valid()) { + ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchReads); + buffer_events += "-- Read from prefetch --"; + CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; Stopwatch watch; { @@ -78,6 +92,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } else { + buffer_events += "-- Read without prefetch --"; size = readInto(memory.data(), memory.size()).get(); if (size) { @@ -87,6 +102,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } } + buffer_events += " + " + toString(size) + " + "; prefetch_future = {}; return size; } @@ -94,6 +110,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) { + ProfileEvents::increment(ProfileEvents::RemoteFSSeeks); + buffer_events += "-- Seek to " + toString(offset_) + " --"; + if (whence == SEEK_CUR) { /// If position within current working buffer - shift pos. @@ -131,6 +150,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence if (prefetch_future.valid()) { + ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches); prefetch_future.wait(); prefetch_future = {}; } @@ -144,8 +164,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence void AsynchronousReadIndirectBufferFromRemoteFS::finalize() { + std::cerr << "\n\n\nBuffer events: " << buffer_events << std::endl; + if (prefetch_future.valid()) { + ProfileEvents::increment(ProfileEvents::RemoteFSUnusedCancelledPrefetches); prefetch_future.wait(); prefetch_future = {}; } diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index 3b5bcc55a66..ed94f72fd69 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -64,6 +64,8 @@ private: size_t absolute_position = 0; Memory<> prefetch_buffer; + + String buffer_events; }; } diff --git a/src/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/IO/ReadIndirectBufferFromRemoteFS.cpp index 25c4795b8a5..d70d280871a 100644 --- a/src/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -3,6 +3,12 @@ #include +namespace ProfileEvents +{ + extern const Event RemoteFSSimpleBufferReads; + extern const Event RemoteFSSimpleBuffers; +} + namespace DB { @@ -15,6 +21,7 @@ namespace ErrorCodes ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( std::shared_ptr impl_) : impl(std::move(impl_)) { + ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBuffers); } @@ -72,6 +79,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) bool ReadIndirectBufferFromRemoteFS::nextImpl() { + ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBufferReads); /// Transfer current position and working_buffer to actual ReadBuffer swap(*impl); /// Position and working_buffer will be updated in next() call