diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp
index b435b483b71..f0a8794d096 100644
--- a/src/Client/ClientBase.cpp
+++ b/src/Client/ClientBase.cpp
@@ -983,8 +983,7 @@ void ClientBase::onProfileEvents(Block & block)
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
thread_times[host_name][thread_id].memory_usage = value;
}
- auto elapsed_time = profile_events.watch.elapsedMicroseconds();
- progress_indication.updateThreadEventData(thread_times, elapsed_time);
+ progress_indication.updateThreadEventData(thread_times);
if (need_render_progress)
progress_indication.writeProgress();
diff --git a/src/Common/EventRateMeter.h b/src/Common/EventRateMeter.h
new file mode 100644
index 00000000000..f70258faa9e
--- /dev/null
+++ b/src/Common/EventRateMeter.h
@@ -0,0 +1,63 @@
+#pragma once
+
+#include
+
+#include
+
+#include
+
+
+namespace DB
+{
+
+/// Event count measurement with exponential smoothing intended for computing time derivatives
+class EventRateMeter
+{
+public:
+ explicit EventRateMeter(double now, double period_)
+ : period(period_)
+ , half_decay_time(period * std::numbers::ln2) // for `ExponentiallySmoothedAverage::sumWeights()` to be equal to `1/period`
+ {
+ reset(now);
+ }
+
+ /// Add `count` events happened at `now` instant.
+ /// Previous events that are older than `period` from `now` will be forgotten
+ /// in a way to keep average event rate the same, using exponential smoothing.
+ /// NOTE: Adding events into distant past (further than `period`) must be avoided.
+ void add(double now, double count)
+ {
+ if (now - period <= start) // precise counting mode
+ events = ExponentiallySmoothedAverage(events.value + count, now);
+ else // exponential smoothing mode
+ events.add(count, now, half_decay_time);
+ }
+
+ /// Compute average event rate throughout `[now - period, now]` period.
+ /// If measurements are just started (`now - period < start`), then average
+ /// is computed based on shorter `[start; now]` period to avoid initial linear growth.
+ double rate(double now)
+ {
+ add(now, 0);
+ if (unlikely(now <= start))
+ return 0;
+ if (now - period <= start) // precise counting mode
+ return events.value / (now - start);
+ else // exponential smoothing mode
+ return events.get(half_decay_time); // equals to `events.value / period`
+ }
+
+ void reset(double now)
+ {
+ start = now;
+ events = ExponentiallySmoothedAverage();
+ }
+
+private:
+ const double period;
+ const double half_decay_time;
+ double start; // Instant in past without events before it; when measurement started or reset
+ ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
+};
+
+}
diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp
index 7bea00f5b1e..8ca1612e916 100644
--- a/src/Common/ProgressIndication.cpp
+++ b/src/Common/ProgressIndication.cpp
@@ -8,6 +8,7 @@
#include "Common/formatReadable.h"
#include
#include
+#include
#include "IO/WriteBufferFromString.h"
#include
@@ -16,16 +17,16 @@ namespace
{
constexpr UInt64 ALL_THREADS = 0;
- double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed)
+ UInt64 aggregateCPUUsageNs(DB::ThreadIdToTimeMap times)
{
- auto accumulated = std::accumulate(times.begin(), times.end(), 0,
+ constexpr UInt64 us_to_ns = 1000;
+ return us_to_ns * std::accumulate(times.begin(), times.end(), 0ull,
[](UInt64 acc, const auto & elem)
{
if (elem.first == ALL_THREADS)
return acc;
return acc + elem.second.time();
});
- return static_cast(accumulated) / elapsed;
}
}
@@ -55,7 +56,7 @@ void ProgressIndication::resetProgress()
write_progress_on_update = false;
{
std::lock_guard lock(profile_events_mutex);
- host_cpu_usage.clear();
+ cpu_usage_meter.reset(static_cast(clock_gettime_ns()));
thread_data.clear();
}
}
@@ -82,15 +83,17 @@ void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id
thread_to_times[thread_id] = {};
}
-void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time)
+void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data)
{
std::lock_guard lock(profile_events_mutex);
+ UInt64 total_cpu_ns = 0;
for (auto & new_host_map : new_thread_data)
{
- host_cpu_usage[new_host_map.first] = calculateCPUUsage(new_host_map.second, elapsed_time);
+ total_cpu_ns += aggregateCPUUsageNs(new_host_map.second);
thread_data[new_host_map.first] = std::move(new_host_map.second);
}
+ cpu_usage_meter.add(static_cast(clock_gettime_ns()), total_cpu_ns);
}
size_t ProgressIndication::getUsedThreadsCount() const
@@ -104,14 +107,10 @@ size_t ProgressIndication::getUsedThreadsCount() const
});
}
-double ProgressIndication::getCPUUsage() const
+double ProgressIndication::getCPUUsage()
{
std::lock_guard lock(profile_events_mutex);
-
- double res = 0;
- for (const auto & elem : host_cpu_usage)
- res += elem.second;
- return res;
+ return cpu_usage_meter.rate(clock_gettime_ns());
}
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const
diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h
index 9ce29ef0d3c..588a31beca7 100644
--- a/src/Common/ProgressIndication.h
+++ b/src/Common/ProgressIndication.h
@@ -7,7 +7,7 @@
#include
#include
#include
-
+#include
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
@@ -59,12 +59,12 @@ public:
void addThreadIdToList(String const & host, UInt64 thread_id);
- void updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time);
+ void updateThreadEventData(HostToThreadTimesMap & new_thread_data);
private:
size_t getUsedThreadsCount() const;
- double getCPUUsage() const;
+ double getCPUUsage();
struct MemoryUsage
{
@@ -91,7 +91,7 @@ private:
bool write_progress_on_update = false;
- std::unordered_map host_cpu_usage;
+ EventRateMeter cpu_usage_meter{static_cast(clock_gettime_ns()), 3'000'000'000 /*ns*/}; // average cpu utilization last 3 second
HostToThreadTimesMap thread_data;
/// In case of all of the above:
/// - clickhouse-local
@@ -100,7 +100,7 @@ private:
///
/// It is possible concurrent access to the following:
/// - writeProgress() (class properties) (guarded with progress_mutex)
- /// - thread_data/host_cpu_usage (guarded with profile_events_mutex)
+ /// - thread_data/cpu_usage_meter (guarded with profile_events_mutex)
mutable std::mutex profile_events_mutex;
mutable std::mutex progress_mutex;
};