mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-30 05:30:51 +00:00
More profile events
This commit is contained in:
parent
4c99f6da51
commit
eb5cb86271
@ -252,6 +252,17 @@
|
||||
M(RemoteFSReadMicroseconds, "Time of reading from remote filesystem.") \
|
||||
M(RemoteFSReadBytes, "Read bytes from remote filesystem.") \
|
||||
\
|
||||
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
|
||||
M(RemoteFSPrefetches, "Total number of prefetches") \
|
||||
M(RemoteFSSeekCancelledPrefetches, "Number of cancelled prefecthes because of seek") \
|
||||
M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches prending in buffer desctructor") \
|
||||
M(RemoteFSPrefetchReads, "Total number of reads from prefecthed buffer") \
|
||||
M(RemoteFSAsyncBufferReads, "Number of nextImpl() calls for async buffer") \
|
||||
M(RemoteFSSimpleBufferReads, "Number of nextImpl() calls for non-async buffer") \
|
||||
M(RemoteFSNewReaders, "Number of created impl objects") \
|
||||
M(RemoteFSAsyncBuffers, "Total number of AsycnhronousReadIndirectBufferFromREmoteFS buffers") \
|
||||
M(RemoteFSSimpleBuffers, "Total number of ReadIndirectBufferFromREmoteFS buffers") \
|
||||
\
|
||||
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
|
||||
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \
|
||||
\
|
||||
|
@ -8,9 +8,17 @@ namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric AsynchronousReadWait;
|
||||
}
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event AsynchronousReadWaitMicroseconds;
|
||||
extern const Event RemoteFSSeeks;
|
||||
extern const Event RemoteFSPrefetches;
|
||||
extern const Event RemoteFSSeekCancelledPrefetches;
|
||||
extern const Event RemoteFSUnusedCancelledPrefetches;
|
||||
extern const Event RemoteFSPrefetchReads;
|
||||
extern const Event RemoteFSAsyncBufferReads;
|
||||
extern const Event RemoteFSAsyncBuffers;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -52,15 +60,21 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
|
||||
return;
|
||||
|
||||
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
|
||||
buffer_events += "-- Prefetch --";
|
||||
}
|
||||
|
||||
|
||||
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBufferReads);
|
||||
size_t size = 0;
|
||||
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchReads);
|
||||
buffer_events += "-- Read from prefetch --";
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
|
||||
Stopwatch watch;
|
||||
{
|
||||
@ -78,6 +92,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer_events += "-- Read without prefetch --";
|
||||
size = readInto(memory.data(), memory.size()).get();
|
||||
if (size)
|
||||
{
|
||||
@ -87,6 +102,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
}
|
||||
}
|
||||
|
||||
buffer_events += " + " + toString(size) + " + ";
|
||||
prefetch_future = {};
|
||||
return size;
|
||||
}
|
||||
@ -94,6 +110,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
|
||||
off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeeks);
|
||||
buffer_events += "-- Seek to " + toString(offset_) + " --";
|
||||
|
||||
if (whence == SEEK_CUR)
|
||||
{
|
||||
/// If position within current working buffer - shift pos.
|
||||
@ -131,6 +150,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches);
|
||||
prefetch_future.wait();
|
||||
prefetch_future = {};
|
||||
}
|
||||
@ -144,8 +164,11 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
|
||||
|
||||
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
|
||||
{
|
||||
std::cerr << "\n\n\nBuffer events: " << buffer_events << std::endl;
|
||||
|
||||
if (prefetch_future.valid())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSUnusedCancelledPrefetches);
|
||||
prefetch_future.wait();
|
||||
prefetch_future = {};
|
||||
}
|
||||
|
@ -64,6 +64,8 @@ private:
|
||||
size_t absolute_position = 0;
|
||||
|
||||
Memory<> prefetch_buffer;
|
||||
|
||||
String buffer_events;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,12 @@
|
||||
#include <Disks/ReadBufferFromRemoteFSGather.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event RemoteFSSimpleBufferReads;
|
||||
extern const Event RemoteFSSimpleBuffers;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -15,6 +21,7 @@ namespace ErrorCodes
|
||||
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
|
||||
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_) : impl(std::move(impl_))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBuffers);
|
||||
}
|
||||
|
||||
|
||||
@ -72,6 +79,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
|
||||
|
||||
bool ReadIndirectBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::RemoteFSSimpleBufferReads);
|
||||
/// Transfer current position and working_buffer to actual ReadBuffer
|
||||
swap(*impl);
|
||||
/// Position and working_buffer will be updated in next() call
|
||||
|
Loading…
Reference in New Issue
Block a user