From d81744fd0631471f0f6f876e913f886345deac82 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Mon, 28 May 2018 22:53:03 +0300 Subject: [PATCH] Add more perfomance counters. [#CLICKHOUSE-2910] --- dbms/src/Common/BackgroundSchedulePool.cpp | 12 ++-- dbms/src/Common/MemoryTracker.cpp | 4 +- dbms/src/Common/MemoryTracker.h | 5 +- dbms/src/Common/ProfileEvents.cpp | 5 ++ dbms/src/Common/Stopwatch.cpp | 14 +++++ dbms/src/Common/Stopwatch.h | 56 +++++++++++++++++++ dbms/src/Common/ThreadStatus.cpp | 11 ++-- dbms/src/Common/ThreadStatus.h | 1 + dbms/src/Common/Throttler.h | 15 ++++- .../IProfilingBlockInputStream.cpp | 14 +++++ dbms/src/IO/ReadBufferFromFileDescriptor.cpp | 11 +++- dbms/src/IO/ReadBufferFromPocoSocket.cpp | 10 ++++ dbms/src/IO/WriteBufferFromFileDescriptor.cpp | 5 ++ dbms/src/IO/WriteBufferFromPocoSocket.cpp | 12 ++++ .../MergeTree/BackgroundProcessingPool.cpp | 3 + 15 files changed, 161 insertions(+), 17 deletions(-) create mode 100644 dbms/src/Common/Stopwatch.cpp diff --git a/dbms/src/Common/BackgroundSchedulePool.cpp b/dbms/src/Common/BackgroundSchedulePool.cpp index 70a2ef66572..c27d357b91e 100644 --- a/dbms/src/Common/BackgroundSchedulePool.cpp +++ b/dbms/src/Common/BackgroundSchedulePool.cpp @@ -6,6 +6,8 @@ #include #include #include +#include "ThreadStatus.h" + namespace CurrentMetrics { @@ -230,9 +232,11 @@ void BackgroundSchedulePool::threadFunction() { setThreadName("BackgrSchedPool"); - MemoryTracker memory_tracker; - memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool); - current_memory_tracker = &memory_tracker; + if (current_thread) + { + ThreadStatus::setCurrentThreadParentQuery(nullptr); + current_thread->memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool); + } while (!shutdown) { @@ -242,8 +246,6 @@ void BackgroundSchedulePool::threadFunction() task_notification.execute(); } } - - current_memory_tracker = nullptr; } diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 5731663b5c7..f8646702908 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -175,7 +175,7 @@ namespace CurrentMemoryTracker } } -DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker() +DB::ActionLock getCurrentMemoryTrackerBlocker() { - return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::LockHolder(nullptr); + return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionLock(); } diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 3e32ce5c9e5..5628640f9a9 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -110,7 +110,8 @@ public: void logPeakMemoryUsage() const; /// To be able to temporarily stop memory tracker - DB::ActionBlockerSingleThread blocker; + /// TODO: Use more lightweight implementation + DB::ActionBlocker blocker; }; @@ -123,4 +124,4 @@ namespace CurrentMemoryTracker } -DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker(); +DB::ActionLock getCurrentMemoryTrackerBlocker(); diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index cdc09ea76b5..e72e81c9ac9 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -39,6 +39,11 @@ M(CreatedReadBufferAIO) \ M(CreatedWriteBufferOrdinary) \ M(CreatedWriteBufferAIO) \ + M(DiskReadElapsedMicroseconds) \ + M(DiskWriteElapsedMicroseconds) \ + M(NetworkReceiveElapsedMicroseconds) \ + M(NetworkSendElapsedMicroseconds) \ + M(ThrottlerSleepMicroseconds) \ \ M(ReplicatedPartFetches) \ M(ReplicatedPartFailedFetches) \ diff --git a/dbms/src/Common/Stopwatch.cpp b/dbms/src/Common/Stopwatch.cpp new file mode 100644 index 00000000000..0fb3d57ba6d --- /dev/null +++ b/dbms/src/Common/Stopwatch.cpp @@ -0,0 +1,14 @@ +#include +#include "Stopwatch.h" + +StopWatchRusage::Timestamp StopWatchRusage::Timestamp::current() +{ + StopWatchRusage::Timestamp res; + + ::rusage rusage; + ::getrusage(RUSAGE_THREAD, &rusage); + + res.user_ns = rusage.ru_utime.tv_sec * 1000000000UL + rusage.ru_utime.tv_usec; + res.sys_ns = rusage.ru_stime.tv_sec * 1000000000UL + rusage.ru_stime.tv_usec; + return res; +} diff --git a/dbms/src/Common/Stopwatch.h b/dbms/src/Common/Stopwatch.h index 414998d0d68..41d9209c11d 100644 --- a/dbms/src/Common/Stopwatch.h +++ b/dbms/src/Common/Stopwatch.h @@ -134,3 +134,59 @@ private: /// Most significant bit is a lock. When it is set, compareAndRestartDeferred method will return false. UInt64 nanoseconds() const { return StopWatchDetail::nanoseconds(clock_type) & 0x7FFFFFFFFFFFFFFFULL; } }; + + +/// Like ordinary StopWatch, but uses getrusage() system call +struct StopWatchRusage +{ + StopWatchRusage() = default; + + void start() { start_ts = Timestamp::current(); is_running = true; } + void stop() { stop_ts = Timestamp::current(); is_running = false; } + void reset() { start_ts = Timestamp(); stop_ts = Timestamp(); is_running = false; } + void restart() { start(); } + + UInt64 elapsed(bool count_user = true, bool count_sys = true) const + { + return elapsedNanoseconds(count_user, count_sys); + } + + UInt64 elapsedNanoseconds(bool count_user = true, bool count_sys = true) const + { + return (is_running ? Timestamp::current() : stop_ts).nanoseconds(count_user, count_sys) - start_ts.nanoseconds(count_user, count_sys); + } + + UInt64 elapsedMicroseconds(bool count_user = true, bool count_sys = true) const + { + return elapsedNanoseconds(count_user, count_sys) / 1000UL; + } + + UInt64 elapsedMilliseconds(bool count_user = true, bool count_sys = true) const + { + return elapsedNanoseconds(count_user, count_sys) / 1000000UL; + } + + double elapsedSeconds(bool count_user = true, bool count_sys = true) const + { + return static_cast(elapsedNanoseconds(count_user, count_sys)) / 1000000000.0; + } + +private: + + struct Timestamp + { + UInt64 user_ns = 0; + UInt64 sys_ns = 0; + + static Timestamp current(); + + UInt64 nanoseconds(bool count_user = true, bool count_sys = true) const + { + return (count_user ? user_ns : 0) + (count_sys ? sys_ns : 0); + } + }; + + Timestamp start_ts; + Timestamp stop_ts; + bool is_running = false; +}; diff --git a/dbms/src/Common/ThreadStatus.cpp b/dbms/src/Common/ThreadStatus.cpp index a1159897483..d2f85b1de08 100644 --- a/dbms/src/Common/ThreadStatus.cpp +++ b/dbms/src/Common/ThreadStatus.cpp @@ -45,7 +45,6 @@ static UInt64 getCurrentTimeMicroseconds(clockid_t clock_type = CLOCK_MONOTONIC) return ts.tv_sec * 1000000ULL + ts.tv_nsec / 1000UL; } - struct RusageCounters { /// In microseconds @@ -79,16 +78,16 @@ struct RusageCounters void setCurrent() { - rusage rusage; - getrusage(RUSAGE_THREAD, &rusage); + ::rusage rusage; + ::getrusage(RUSAGE_THREAD, &rusage); set(rusage, getCurrentTimeMicroseconds()); } void set(const ::rusage & rusage, UInt64 real_time_) { real_time = real_time_; - user_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec; - sys_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec; + user_time = rusage.ru_utime.tv_sec * 1000000UL + rusage.ru_utime.tv_usec / 1000UL; + sys_time = rusage.ru_stime.tv_sec * 1000000UL + rusage.ru_stime.tv_usec / 1000UL; page_reclaims = static_cast(rusage.ru_minflt); voluntary_context_switches = static_cast(rusage.ru_nvcsw); @@ -293,6 +292,8 @@ struct ScopeCurrentThread } }; +ThreadStatusPtr ThreadStatus::getCurrent() const { return current_thread; } + thread_local ThreadStatusPtr current_thread = ThreadStatus::create(); /// Order of current_thread and current_thread_scope matters diff --git a/dbms/src/Common/ThreadStatus.h b/dbms/src/Common/ThreadStatus.h index cb45c11f280..044865ddf6b 100644 --- a/dbms/src/Common/ThreadStatus.h +++ b/dbms/src/Common/ThreadStatus.h @@ -41,6 +41,7 @@ public: static void setCurrentThreadParentQuery(QueryStatus * parent_process); static void setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_thread); + ThreadStatusPtr getCurrent() const; ~ThreadStatus(); diff --git a/dbms/src/Common/Throttler.h b/dbms/src/Common/Throttler.h index 22aa102cd1b..ba76242fbab 100644 --- a/dbms/src/Common/Throttler.h +++ b/dbms/src/Common/Throttler.h @@ -5,9 +5,16 @@ #include #include #include +#include #include +namespace ProfileEvents +{ + extern const Event ThrottlerSleepMicroseconds; +} + + namespace DB { @@ -68,10 +75,14 @@ public: if (desired_ns > elapsed_ns) { UInt64 sleep_ns = desired_ns - elapsed_ns; - timespec sleep_ts; + ::timespec sleep_ts; sleep_ts.tv_sec = sleep_ns / 1000000000; sleep_ts.tv_nsec = sleep_ns % 1000000000; - nanosleep(&sleep_ts, nullptr); /// NOTE Returns early in case of a signal. This is considered normal. + + /// NOTE: Returns early in case of a signal. This is considered normal. + ::nanosleep(&sleep_ts, nullptr); + + ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_ns / 1000UL); } } diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 6cd57d08f3e..25ca02a0398 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -1,6 +1,13 @@ #include #include #include +#include + + +namespace ProfileEvents +{ + extern const Event ThrottlerSleepMicroseconds; +} namespace DB @@ -293,6 +300,13 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0) { + /// Do not count sleeps in throttlers + if (current_thread) + { + UInt64 throttler_sleeps_ms = current_thread->performance_counters[ProfileEvents::ThrottlerSleepMicroseconds]; + total_elapsed -= static_cast(throttler_sleeps_ms) / 1000000.0; + } + if (limits.min_execution_speed && progress.rows / total_elapsed < limits.min_execution_speed) throw Exception("Query is executing too slow: " + toString(progress.rows / total_elapsed) + " rows/sec., minimum: " + toString(limits.min_execution_speed), diff --git a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp index d2acffaf2e7..6218697795f 100644 --- a/dbms/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/ReadBufferFromFileDescriptor.cpp @@ -14,6 +14,7 @@ namespace ProfileEvents extern const Event ReadBufferFromFileDescriptorRead; extern const Event ReadBufferFromFileDescriptorReadFailed; extern const Event ReadBufferFromFileDescriptorReadBytes; + extern const Event DiskReadElapsedMicroseconds; extern const Event Seek; } @@ -47,6 +48,7 @@ bool ReadBufferFromFileDescriptor::nextImpl() { ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + StopWatchRusage watch_ru; std::optional watch; if (profile_callback) watch.emplace(clock_type); @@ -68,6 +70,9 @@ bool ReadBufferFromFileDescriptor::nextImpl() if (res > 0) bytes_read += res; + watch_ru.stop(); + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch_ru.elapsedMicroseconds()); + if (profile_callback) { ProfileInfo info; @@ -114,12 +119,16 @@ off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence) else { ProfileEvents::increment(ProfileEvents::Seek); + StopWatchRusage watch_ru; pos = working_buffer.end(); - off_t res = lseek(fd, new_pos, SEEK_SET); + off_t res = ::lseek(fd, new_pos, SEEK_SET); if (-1 == res) throwFromErrno("Cannot seek through file " + getFileName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE); pos_in_file = new_pos; + + ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch_ru.elapsedMicroseconds()); + return res; } } diff --git a/dbms/src/IO/ReadBufferFromPocoSocket.cpp b/dbms/src/IO/ReadBufferFromPocoSocket.cpp index fa20a41b004..88686b8b33c 100644 --- a/dbms/src/IO/ReadBufferFromPocoSocket.cpp +++ b/dbms/src/IO/ReadBufferFromPocoSocket.cpp @@ -4,6 +4,13 @@ #include #include +#include + + +namespace ProfileEvents +{ + extern const Event NetworkReceiveElapsedMicroseconds; +} namespace DB @@ -20,6 +27,7 @@ namespace ErrorCodes bool ReadBufferFromPocoSocket::nextImpl() { ssize_t bytes_read = 0; + StopWatchRusage watch_ru; /// Add more details to exceptions. try @@ -42,6 +50,8 @@ bool ReadBufferFromPocoSocket::nextImpl() if (bytes_read < 0) throw NetException("Cannot read from socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_READ_FROM_SOCKET); + ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch_ru.elapsedMicroseconds()); + if (bytes_read) working_buffer.resize(bytes_read); else diff --git a/dbms/src/IO/WriteBufferFromFileDescriptor.cpp b/dbms/src/IO/WriteBufferFromFileDescriptor.cpp index d22fe7229b8..75f5aefef71 100644 --- a/dbms/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/dbms/src/IO/WriteBufferFromFileDescriptor.cpp @@ -7,6 +7,7 @@ #include #include +#include namespace ProfileEvents @@ -14,6 +15,7 @@ namespace ProfileEvents extern const Event WriteBufferFromFileDescriptorWrite; extern const Event WriteBufferFromFileDescriptorWriteFailed; extern const Event WriteBufferFromFileDescriptorWriteBytes; + extern const Event DiskWriteElapsedMicroseconds; } namespace CurrentMetrics @@ -38,6 +40,8 @@ void WriteBufferFromFileDescriptor::nextImpl() if (!offset()) return; + StopWatchRusage watch_ru; + size_t bytes_written = 0; while (bytes_written != offset()) { @@ -59,6 +63,7 @@ void WriteBufferFromFileDescriptor::nextImpl() bytes_written += res; } + ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch_ru.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); } diff --git a/dbms/src/IO/WriteBufferFromPocoSocket.cpp b/dbms/src/IO/WriteBufferFromPocoSocket.cpp index 0255f7bf5a0..cefee566055 100644 --- a/dbms/src/IO/WriteBufferFromPocoSocket.cpp +++ b/dbms/src/IO/WriteBufferFromPocoSocket.cpp @@ -4,6 +4,13 @@ #include #include +#include + + +namespace ProfileEvents +{ + extern const Event NetworkSendElapsedMicroseconds; +} namespace DB @@ -22,6 +29,8 @@ void WriteBufferFromPocoSocket::nextImpl() if (!offset()) return; + StopWatchRusage watch_ru; + size_t bytes_written = 0; while (bytes_written < offset()) { @@ -47,7 +56,10 @@ void WriteBufferFromPocoSocket::nextImpl() if (res < 0) throw NetException("Cannot write to socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_WRITE_TO_SOCKET); + bytes_written += res; + + ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch_ru.elapsedMicroseconds()); } } diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index 4f41b892dd9..c4401b38547 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -115,7 +115,10 @@ void BackgroundProcessingPool::threadFunction() { setThreadName("BackgrProcPool"); if (current_thread) + { ThreadStatus::setCurrentThreadParentQuery(nullptr); + current_thread->memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); + } pcg64 rng(randomSeed()); std::this_thread::sleep_for(std::chrono::duration(std::uniform_real_distribution(0, sleep_seconds_random_part)(rng)));