fix cpu usage metric in client

This commit is contained in:
Sergei Trifonov 2022-07-16 00:27:26 +02:00
parent 43ed3df474
commit 8a5286ea4c
4 changed files with 87 additions and 19 deletions

View File

@ -983,8 +983,7 @@ void ClientBase::onProfileEvents(Block & block)
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
thread_times[host_name][thread_id].memory_usage = value;
}
auto elapsed_time = profile_events.watch.elapsedMicroseconds();
progress_indication.updateThreadEventData(thread_times, elapsed_time);
progress_indication.updateThreadEventData(thread_times);
if (need_render_progress)
progress_indication.writeProgress();

View File

@ -0,0 +1,70 @@
#pragma once
#include <base/types.h>
#include <algorithm>
#include <cmath>
namespace DB
{
/// Event count measurement with exponential smoothing intended for computing time derivatives
class EventRateMeter {
public:
explicit EventRateMeter(UInt64 period_, UInt64 resolution = 1000)
: period(std::max(period_, 1ul))
, step(std::max(period / resolution, 1ul))
, decay(1.0 - 1.0 / resolution)
{}
/// Add `count` events happened at `now` instant.
/// Previous events that are older than `period` from `now` will be forgotten
/// in a way to keep average event rate the same, using exponential smoothing.
/// NOTE: Adding events into distant past (further than `period`) must be avoided.
void add(UInt64 now, UInt64 count)
{
if (unlikely(end == 0))
{
// Initialization during the first call
end = now + period;
}
else if (now > end)
{
// Compute number of steps we have to move for `now <= end` to became true
UInt64 steps = (now - end + step - 1) / step;
end += steps * step;
assert(now <= end);
// Forget old events, assuming all events are distributed evenly throughout whole `period`.
// This assumption leads to exponential decay in case no new events will come.
if (steps == 1)
events *= decay;
else
events *= std::pow(decay, steps);
}
// Add new events
events += count;
}
/// Compute average event rate thoughout `[now - period, now]` period.
double rate(UInt64 now)
{
add(now, 0);
return double(events) / period;
}
void reset()
{
events = 0;
end = 0;
}
private:
const UInt64 period;
const UInt64 step;
const double decay;
double events = 0; // Estimated number of events in [now - period, now] range
UInt64 end = 0;
};
}

View File

@ -8,6 +8,7 @@
#include "Common/formatReadable.h"
#include <Common/TerminalSize.h>
#include <Common/UnicodeBar.h>
#include <Common/Stopwatch.h>
#include "IO/WriteBufferFromString.h"
#include <Databases/DatabaseMemory.h>
@ -16,16 +17,16 @@ namespace
{
constexpr UInt64 ALL_THREADS = 0;
double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed)
UInt64 aggregateCPUUsageNs(DB::ThreadIdToTimeMap times)
{
auto accumulated = std::accumulate(times.begin(), times.end(), 0,
constexpr UInt64 us_to_ns = 1000;
return us_to_ns * std::accumulate(times.begin(), times.end(), 0ull,
[](UInt64 acc, const auto & elem)
{
if (elem.first == ALL_THREADS)
return acc;
return acc + elem.second.time();
});
return static_cast<double>(accumulated) / elapsed;
}
}
@ -55,7 +56,7 @@ void ProgressIndication::resetProgress()
write_progress_on_update = false;
{
std::lock_guard lock(profile_events_mutex);
host_cpu_usage.clear();
cpu_usage_meter.reset();
thread_data.clear();
}
}
@ -82,15 +83,17 @@ void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id
thread_to_times[thread_id] = {};
}
void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time)
void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data)
{
std::lock_guard lock(profile_events_mutex);
UInt64 total_cpu_ns = 0;
for (auto & new_host_map : new_thread_data)
{
host_cpu_usage[new_host_map.first] = calculateCPUUsage(new_host_map.second, elapsed_time);
total_cpu_ns += aggregateCPUUsageNs(new_host_map.second);
thread_data[new_host_map.first] = std::move(new_host_map.second);
}
cpu_usage_meter.add(clock_gettime_ns(), total_cpu_ns);
}
size_t ProgressIndication::getUsedThreadsCount() const
@ -104,14 +107,10 @@ size_t ProgressIndication::getUsedThreadsCount() const
});
}
double ProgressIndication::getCPUUsage() const
double ProgressIndication::getCPUUsage()
{
std::lock_guard lock(profile_events_mutex);
double res = 0;
for (const auto & elem : host_cpu_usage)
res += elem.second;
return res;
return cpu_usage_meter.rate(clock_gettime_ns());
}
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const

View File

@ -7,7 +7,7 @@
#include <Interpreters/Context.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
#include <Common/EventRateMeter.h>
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
@ -59,12 +59,12 @@ public:
void addThreadIdToList(String const & host, UInt64 thread_id);
void updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time);
void updateThreadEventData(HostToThreadTimesMap & new_thread_data);
private:
size_t getUsedThreadsCount() const;
double getCPUUsage() const;
double getCPUUsage();
struct MemoryUsage
{
@ -91,7 +91,7 @@ private:
bool write_progress_on_update = false;
std::unordered_map<String, double> host_cpu_usage;
EventRateMeter cpu_usage_meter{1'000'000'000 /*ns*/}; // average cpu utilization per 1 second
HostToThreadTimesMap thread_data;
/// In case of all of the above:
/// - clickhouse-local
@ -100,7 +100,7 @@ private:
///
/// It is possible concurrent access to the following:
/// - writeProgress() (class properties) (guarded with progress_mutex)
/// - thread_data/host_cpu_usage (guarded with profile_events_mutex)
/// - thread_data/cpu_usage_meter (guarded with profile_events_mutex)
mutable std::mutex profile_events_mutex;
mutable std::mutex progress_mutex;
};