diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index f4f47148d56..fc67a12dd50 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -251,6 +251,13 @@ \ M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \ M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \ + \ + M(ThreadPoolReaderPageCacheHit, "Number of times the read inside ThreadPoolReader was done from page cache.") \ + M(ThreadPoolReaderPageCacheHitBytes, "Number of bytes read inside ThreadPoolReader when it was done from page cache.") \ + M(ThreadPoolReaderPageCacheHitElapsedMicroseconds, "Time spent reading data from page cache in ThreadPoolReader.") \ + M(ThreadPoolReaderPageCacheMiss, "Number of times the read inside ThreadPoolReader was not done from page cache and was hand off to thread pool.") \ + M(ThreadPoolReaderPageCacheMissBytes, "Number of bytes read inside ThreadPoolReader when read was not done from page cache and was hand off to thread pool.") \ + M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \ namespace ProfileEvents diff --git a/src/IO/SynchronousReader.cpp b/src/IO/SynchronousReader.cpp index 12c81995f99..a9a9c5e0fe6 100644 --- a/src/IO/SynchronousReader.cpp +++ b/src/IO/SynchronousReader.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -8,6 +11,20 @@ #include +namespace ProfileEvents +{ + extern const Event ReadBufferFromFileDescriptorRead; + extern const Event ReadBufferFromFileDescriptorReadFailed; + extern const Event ReadBufferFromFileDescriptorReadBytes; + extern const Event DiskReadElapsedMicroseconds; + extern const Event Seek; +} + +namespace CurrentMetrics +{ + extern const Metric Read; +} + namespace DB { @@ -17,6 +34,7 @@ namespace ErrorCodes extern const int CANNOT_ADVISE; } + std::future SynchronousReader::submit(Request request) { int fd = assert_cast(*request.descriptor).fd; @@ -28,22 +46,40 @@ std::future SynchronousReader::submit(Request reque return std::async(std::launch::deferred, [fd, request] { - /// TODO Instrumentation. + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + Stopwatch watch(CLOCK_MONOTONIC); size_t bytes_read = 0; while (!bytes_read) { - ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + ssize_t res = 0; + + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = ::pread(fd, request.buf, request.size, request.offset); + } if (!res) break; if (-1 == res && errno != EINTR) + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } if (res > 0) bytes_read += res; } + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (TaskStatsInfoGetter has about 500K RPS). + watch.stop(); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + return bytes_read; }); } diff --git a/src/IO/ThreadPoolReader.cpp b/src/IO/ThreadPoolReader.cpp index 3fed63dc3ec..3857dd993c9 100644 --- a/src/IO/ThreadPoolReader.cpp +++ b/src/IO/ThreadPoolReader.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -34,12 +37,34 @@ #endif +namespace ProfileEvents +{ + extern const Event ThreadPoolReaderPageCacheHit; + extern const Event ThreadPoolReaderPageCacheHitBytes; + extern const Event ThreadPoolReaderPageCacheHitElapsedMicroseconds; + extern const Event ThreadPoolReaderPageCacheMiss; + extern const Event ThreadPoolReaderPageCacheMissBytes; + extern const Event ThreadPoolReaderPageCacheMissElapsedMicroseconds; + + extern const Event ReadBufferFromFileDescriptorRead; + extern const Event ReadBufferFromFileDescriptorReadFailed; + extern const Event ReadBufferFromFileDescriptorReadBytes; + extern const Event DiskReadElapsedMicroseconds; +} + +namespace CurrentMetrics +{ + extern const Metric Read; +} + + namespace DB { namespace ErrorCodes { extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; + } @@ -62,24 +87,38 @@ std::future ThreadPoolReader::submit(Request reques if (has_pread_nowait_support.load(std::memory_order_relaxed)) { + Stopwatch watch(CLOCK_MONOTONIC); + std::promise promise; std::future future = promise.get_future(); size_t bytes_read = 0; while (!bytes_read) { - struct iovec io_vec{ .iov_base = request.buf, .iov_len = request.size }; - ssize_t res = syscall( - SYS_preadv2, fd, - &io_vec, 1, - static_cast(request.offset), static_cast(request.offset >> 32), - RWF_NOWAIT); + ssize_t res = 0; - //ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + + struct iovec io_vec{ .iov_base = request.buf, .iov_len = request.size }; + res = syscall( + SYS_preadv2, fd, + &io_vec, 1, + /// This is kind of weird calling convention for syscall. + static_cast(request.offset), static_cast(request.offset >> 32), + /// This flag forces read from page cache or returning EAGAIN. + RWF_NOWAIT); + } if (!res) { + /// The file has ended. promise.set_value(0); + + watch.stop(); + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + return future; } @@ -87,21 +126,23 @@ std::future ThreadPoolReader::submit(Request reques { if (errno == ENOSYS || errno == EOPNOTSUPP) { + /// No support for the syscall or the flag in the Linux kernel. has_pread_nowait_support.store(false, std::memory_order_relaxed); break; } else if (errno == EAGAIN) { - /// Data is not available. - //std::cerr << "miss\n"; + /// Data is not available in page cache. Will hand off to thread pool. break; } else if (errno == EINTR) { + /// Interrupted by a signal. continue; } else { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); promise.set_exception(std::make_exception_ptr(ErrnoException( fmt::format("Cannot read from file {}, {}", fd, errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)), @@ -117,32 +158,62 @@ std::future ThreadPoolReader::submit(Request reques if (bytes_read) { - //std::cerr << "hit\n"; + /// It reports real time spent including the time spent while thread was preempted doing nothing. + /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables). + /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it + /// (TaskStatsInfoGetter has about 500K RPS). + watch.stop(); + + /// Read successfully from page cache. + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHit); + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + promise.set_value(bytes_read); return future; } } #endif + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss); + auto task = std::make_shared>([request, fd] { setThreadName("ThreadPoolRead"); - - /// TODO Instrumentation. + Stopwatch watch(CLOCK_MONOTONIC); size_t bytes_read = 0; while (!bytes_read) { - ssize_t res = ::pread(fd, request.buf, request.size, request.offset); + ssize_t res = 0; + + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = ::pread(fd, request.buf, request.size, request.offset); + } + + /// File has ended. if (!res) break; if (-1 == res && errno != EINTR) + { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed); throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } bytes_read += res; } + watch.stop(); + + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); + ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); + return bytes_read; });