From 8a5286ea4c466f2ceeaae2cf7435ba074820f1c5 Mon Sep 17 00:00:00 2001 From: Sergei Trifonov Date: Sat, 16 Jul 2022 00:27:26 +0200 Subject: [PATCH] fix cpu usage metric in client --- src/Client/ClientBase.cpp | 3 +- src/Common/EventRateMeter.h | 70 +++++++++++++++++++++++++++++++ src/Common/ProgressIndication.cpp | 23 +++++----- src/Common/ProgressIndication.h | 10 ++--- 4 files changed, 87 insertions(+), 19 deletions(-) create mode 100644 src/Common/EventRateMeter.h diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index b435b483b71..f0a8794d096 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -983,8 +983,7 @@ void ClientBase::onProfileEvents(Block & block) else if (event_name == MemoryTracker::USAGE_EVENT_NAME) thread_times[host_name][thread_id].memory_usage = value; } - auto elapsed_time = profile_events.watch.elapsedMicroseconds(); - progress_indication.updateThreadEventData(thread_times, elapsed_time); + progress_indication.updateThreadEventData(thread_times); if (need_render_progress) progress_indication.writeProgress(); diff --git a/src/Common/EventRateMeter.h b/src/Common/EventRateMeter.h new file mode 100644 index 00000000000..27cb642a55f --- /dev/null +++ b/src/Common/EventRateMeter.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/// Event count measurement with exponential smoothing intended for computing time derivatives +class EventRateMeter { +public: + explicit EventRateMeter(UInt64 period_, UInt64 resolution = 1000) + : period(std::max(period_, 1ul)) + , step(std::max(period / resolution, 1ul)) + , decay(1.0 - 1.0 / resolution) + {} + + /// Add `count` events happened at `now` instant. + /// Previous events that are older than `period` from `now` will be forgotten + /// in a way to keep average event rate the same, using exponential smoothing. + /// NOTE: Adding events into distant past (further than `period`) must be avoided. + void add(UInt64 now, UInt64 count) + { + if (unlikely(end == 0)) + { + // Initialization during the first call + end = now + period; + } + else if (now > end) + { + // Compute number of steps we have to move for `now <= end` to became true + UInt64 steps = (now - end + step - 1) / step; + end += steps * step; + assert(now <= end); + + // Forget old events, assuming all events are distributed evenly throughout whole `period`. + // This assumption leads to exponential decay in case no new events will come. + if (steps == 1) + events *= decay; + else + events *= std::pow(decay, steps); + } + + // Add new events + events += count; + } + + /// Compute average event rate thoughout `[now - period, now]` period. + double rate(UInt64 now) + { + add(now, 0); + return double(events) / period; + } + + void reset() + { + events = 0; + end = 0; + } + +private: + const UInt64 period; + const UInt64 step; + const double decay; + double events = 0; // Estimated number of events in [now - period, now] range + UInt64 end = 0; +}; + +} diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index 7bea00f5b1e..e0e63ae864d 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -8,6 +8,7 @@ #include "Common/formatReadable.h" #include #include +#include #include "IO/WriteBufferFromString.h" #include @@ -16,16 +17,16 @@ namespace { constexpr UInt64 ALL_THREADS = 0; - double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed) + UInt64 aggregateCPUUsageNs(DB::ThreadIdToTimeMap times) { - auto accumulated = std::accumulate(times.begin(), times.end(), 0, + constexpr UInt64 us_to_ns = 1000; + return us_to_ns * std::accumulate(times.begin(), times.end(), 0ull, [](UInt64 acc, const auto & elem) { if (elem.first == ALL_THREADS) return acc; return acc + elem.second.time(); }); - return static_cast(accumulated) / elapsed; } } @@ -55,7 +56,7 @@ void ProgressIndication::resetProgress() write_progress_on_update = false; { std::lock_guard lock(profile_events_mutex); - host_cpu_usage.clear(); + cpu_usage_meter.reset(); thread_data.clear(); } } @@ -82,15 +83,17 @@ void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id thread_to_times[thread_id] = {}; } -void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time) +void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data) { std::lock_guard lock(profile_events_mutex); + UInt64 total_cpu_ns = 0; for (auto & new_host_map : new_thread_data) { - host_cpu_usage[new_host_map.first] = calculateCPUUsage(new_host_map.second, elapsed_time); + total_cpu_ns += aggregateCPUUsageNs(new_host_map.second); thread_data[new_host_map.first] = std::move(new_host_map.second); } + cpu_usage_meter.add(clock_gettime_ns(), total_cpu_ns); } size_t ProgressIndication::getUsedThreadsCount() const @@ -104,14 +107,10 @@ size_t ProgressIndication::getUsedThreadsCount() const }); } -double ProgressIndication::getCPUUsage() const +double ProgressIndication::getCPUUsage() { std::lock_guard lock(profile_events_mutex); - - double res = 0; - for (const auto & elem : host_cpu_usage) - res += elem.second; - return res; + return cpu_usage_meter.rate(clock_gettime_ns()); } ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 9ce29ef0d3c..47f67ffb53c 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -7,7 +7,7 @@ #include #include #include - +#include /// http://en.wikipedia.org/wiki/ANSI_escape_code #define CLEAR_TO_END_OF_LINE "\033[K" @@ -59,12 +59,12 @@ public: void addThreadIdToList(String const & host, UInt64 thread_id); - void updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time); + void updateThreadEventData(HostToThreadTimesMap & new_thread_data); private: size_t getUsedThreadsCount() const; - double getCPUUsage() const; + double getCPUUsage(); struct MemoryUsage { @@ -91,7 +91,7 @@ private: bool write_progress_on_update = false; - std::unordered_map host_cpu_usage; + EventRateMeter cpu_usage_meter{1'000'000'000 /*ns*/}; // average cpu utilization per 1 second HostToThreadTimesMap thread_data; /// In case of all of the above: /// - clickhouse-local @@ -100,7 +100,7 @@ private: /// /// It is possible concurrent access to the following: /// - writeProgress() (class properties) (guarded with progress_mutex) - /// - thread_data/host_cpu_usage (guarded with profile_events_mutex) + /// - thread_data/cpu_usage_meter (guarded with profile_events_mutex) mutable std::mutex profile_events_mutex; mutable std::mutex progress_mutex; };