mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #39280 from ClickHouse/avg-cpu-progress
fix cpu usage metric in client
This commit is contained in:
commit
499818751e
@ -983,8 +983,7 @@ void ClientBase::onProfileEvents(Block & block)
|
|||||||
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
|
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
|
||||||
thread_times[host_name][thread_id].memory_usage = value;
|
thread_times[host_name][thread_id].memory_usage = value;
|
||||||
}
|
}
|
||||||
auto elapsed_time = profile_events.watch.elapsedMicroseconds();
|
progress_indication.updateThreadEventData(thread_times);
|
||||||
progress_indication.updateThreadEventData(thread_times, elapsed_time);
|
|
||||||
|
|
||||||
if (need_render_progress)
|
if (need_render_progress)
|
||||||
progress_indication.writeProgress();
|
progress_indication.writeProgress();
|
||||||
|
63
src/Common/EventRateMeter.h
Normal file
63
src/Common/EventRateMeter.h
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <base/defines.h>
|
||||||
|
|
||||||
|
#include <Common/ExponentiallySmoothedCounter.h>
|
||||||
|
|
||||||
|
#include <numbers>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Event count measurement with exponential smoothing intended for computing time derivatives
|
||||||
|
class EventRateMeter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit EventRateMeter(double now, double period_)
|
||||||
|
: period(period_)
|
||||||
|
, half_decay_time(period * std::numbers::ln2) // for `ExponentiallySmoothedAverage::sumWeights()` to be equal to `1/period`
|
||||||
|
{
|
||||||
|
reset(now);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add `count` events happened at `now` instant.
|
||||||
|
/// Previous events that are older than `period` from `now` will be forgotten
|
||||||
|
/// in a way to keep average event rate the same, using exponential smoothing.
|
||||||
|
/// NOTE: Adding events into distant past (further than `period`) must be avoided.
|
||||||
|
void add(double now, double count)
|
||||||
|
{
|
||||||
|
if (now - period <= start) // precise counting mode
|
||||||
|
events = ExponentiallySmoothedAverage(events.value + count, now);
|
||||||
|
else // exponential smoothing mode
|
||||||
|
events.add(count, now, half_decay_time);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute average event rate throughout `[now - period, now]` period.
|
||||||
|
/// If measurements are just started (`now - period < start`), then average
|
||||||
|
/// is computed based on shorter `[start; now]` period to avoid initial linear growth.
|
||||||
|
double rate(double now)
|
||||||
|
{
|
||||||
|
add(now, 0);
|
||||||
|
if (unlikely(now <= start))
|
||||||
|
return 0;
|
||||||
|
if (now - period <= start) // precise counting mode
|
||||||
|
return events.value / (now - start);
|
||||||
|
else // exponential smoothing mode
|
||||||
|
return events.get(half_decay_time); // equals to `events.value / period`
|
||||||
|
}
|
||||||
|
|
||||||
|
void reset(double now)
|
||||||
|
{
|
||||||
|
start = now;
|
||||||
|
events = ExponentiallySmoothedAverage();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const double period;
|
||||||
|
const double half_decay_time;
|
||||||
|
double start; // Instant in past without events before it; when measurement started or reset
|
||||||
|
ExponentiallySmoothedAverage events; // Estimated number of events in the last `period`
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -8,6 +8,7 @@
|
|||||||
#include "Common/formatReadable.h"
|
#include "Common/formatReadable.h"
|
||||||
#include <Common/TerminalSize.h>
|
#include <Common/TerminalSize.h>
|
||||||
#include <Common/UnicodeBar.h>
|
#include <Common/UnicodeBar.h>
|
||||||
|
#include <Common/Stopwatch.h>
|
||||||
#include "IO/WriteBufferFromString.h"
|
#include "IO/WriteBufferFromString.h"
|
||||||
#include <Databases/DatabaseMemory.h>
|
#include <Databases/DatabaseMemory.h>
|
||||||
|
|
||||||
@ -16,16 +17,16 @@ namespace
|
|||||||
{
|
{
|
||||||
constexpr UInt64 ALL_THREADS = 0;
|
constexpr UInt64 ALL_THREADS = 0;
|
||||||
|
|
||||||
double calculateCPUUsage(DB::ThreadIdToTimeMap times, UInt64 elapsed)
|
UInt64 aggregateCPUUsageNs(DB::ThreadIdToTimeMap times)
|
||||||
{
|
{
|
||||||
auto accumulated = std::accumulate(times.begin(), times.end(), 0,
|
constexpr UInt64 us_to_ns = 1000;
|
||||||
|
return us_to_ns * std::accumulate(times.begin(), times.end(), 0ull,
|
||||||
[](UInt64 acc, const auto & elem)
|
[](UInt64 acc, const auto & elem)
|
||||||
{
|
{
|
||||||
if (elem.first == ALL_THREADS)
|
if (elem.first == ALL_THREADS)
|
||||||
return acc;
|
return acc;
|
||||||
return acc + elem.second.time();
|
return acc + elem.second.time();
|
||||||
});
|
});
|
||||||
return static_cast<double>(accumulated) / elapsed;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ void ProgressIndication::resetProgress()
|
|||||||
write_progress_on_update = false;
|
write_progress_on_update = false;
|
||||||
{
|
{
|
||||||
std::lock_guard lock(profile_events_mutex);
|
std::lock_guard lock(profile_events_mutex);
|
||||||
host_cpu_usage.clear();
|
cpu_usage_meter.reset(static_cast<double>(clock_gettime_ns()));
|
||||||
thread_data.clear();
|
thread_data.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,15 +83,17 @@ void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id
|
|||||||
thread_to_times[thread_id] = {};
|
thread_to_times[thread_id] = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time)
|
void ProgressIndication::updateThreadEventData(HostToThreadTimesMap & new_thread_data)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(profile_events_mutex);
|
std::lock_guard lock(profile_events_mutex);
|
||||||
|
|
||||||
|
UInt64 total_cpu_ns = 0;
|
||||||
for (auto & new_host_map : new_thread_data)
|
for (auto & new_host_map : new_thread_data)
|
||||||
{
|
{
|
||||||
host_cpu_usage[new_host_map.first] = calculateCPUUsage(new_host_map.second, elapsed_time);
|
total_cpu_ns += aggregateCPUUsageNs(new_host_map.second);
|
||||||
thread_data[new_host_map.first] = std::move(new_host_map.second);
|
thread_data[new_host_map.first] = std::move(new_host_map.second);
|
||||||
}
|
}
|
||||||
|
cpu_usage_meter.add(static_cast<double>(clock_gettime_ns()), total_cpu_ns);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ProgressIndication::getUsedThreadsCount() const
|
size_t ProgressIndication::getUsedThreadsCount() const
|
||||||
@ -104,14 +107,10 @@ size_t ProgressIndication::getUsedThreadsCount() const
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
double ProgressIndication::getCPUUsage() const
|
double ProgressIndication::getCPUUsage()
|
||||||
{
|
{
|
||||||
std::lock_guard lock(profile_events_mutex);
|
std::lock_guard lock(profile_events_mutex);
|
||||||
|
return cpu_usage_meter.rate(clock_gettime_ns());
|
||||||
double res = 0;
|
|
||||||
for (const auto & elem : host_cpu_usage)
|
|
||||||
res += elem.second;
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const
|
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
|
#include <Common/EventRateMeter.h>
|
||||||
|
|
||||||
/// http://en.wikipedia.org/wiki/ANSI_escape_code
|
/// http://en.wikipedia.org/wiki/ANSI_escape_code
|
||||||
#define CLEAR_TO_END_OF_LINE "\033[K"
|
#define CLEAR_TO_END_OF_LINE "\033[K"
|
||||||
@ -59,12 +59,12 @@ public:
|
|||||||
|
|
||||||
void addThreadIdToList(String const & host, UInt64 thread_id);
|
void addThreadIdToList(String const & host, UInt64 thread_id);
|
||||||
|
|
||||||
void updateThreadEventData(HostToThreadTimesMap & new_thread_data, UInt64 elapsed_time);
|
void updateThreadEventData(HostToThreadTimesMap & new_thread_data);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
size_t getUsedThreadsCount() const;
|
size_t getUsedThreadsCount() const;
|
||||||
|
|
||||||
double getCPUUsage() const;
|
double getCPUUsage();
|
||||||
|
|
||||||
struct MemoryUsage
|
struct MemoryUsage
|
||||||
{
|
{
|
||||||
@ -91,7 +91,7 @@ private:
|
|||||||
|
|
||||||
bool write_progress_on_update = false;
|
bool write_progress_on_update = false;
|
||||||
|
|
||||||
std::unordered_map<String, double> host_cpu_usage;
|
EventRateMeter cpu_usage_meter{static_cast<double>(clock_gettime_ns()), 3'000'000'000 /*ns*/}; // average cpu utilization last 3 second
|
||||||
HostToThreadTimesMap thread_data;
|
HostToThreadTimesMap thread_data;
|
||||||
/// In case of all of the above:
|
/// In case of all of the above:
|
||||||
/// - clickhouse-local
|
/// - clickhouse-local
|
||||||
@ -100,7 +100,7 @@ private:
|
|||||||
///
|
///
|
||||||
/// It is possible concurrent access to the following:
|
/// It is possible concurrent access to the following:
|
||||||
/// - writeProgress() (class properties) (guarded with progress_mutex)
|
/// - writeProgress() (class properties) (guarded with progress_mutex)
|
||||||
/// - thread_data/host_cpu_usage (guarded with profile_events_mutex)
|
/// - thread_data/cpu_usage_meter (guarded with profile_events_mutex)
|
||||||
mutable std::mutex profile_events_mutex;
|
mutable std::mutex profile_events_mutex;
|
||||||
mutable std::mutex progress_mutex;
|
mutable std::mutex progress_mutex;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user