Development

This commit is contained in:
Alexey Milovidov 2021-07-04 23:49:36 +03:00
parent 935e0327a5
commit c4675285bf
3 changed files with 340 additions and 69 deletions

View File

@ -557,6 +557,7 @@
M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \
M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
M(590, CANNOT_SYSCONF) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -36,10 +36,16 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int CANNOT_SYSCONF;
}
static constexpr size_t small_buffer_size = 4096;
static void openFileIfExists(const char * filename, std::optional<ReadBufferFromFile> & out)
{
static constexpr size_t small_buffer_size = 4096;
/// Ignoring time of check is not time of use cases, as procfs/sysfs files are fairly persistent.
std::error_code ec;
@ -47,6 +53,15 @@ static void openFileIfExists(const char * filename, std::optional<ReadBufferFrom
out.emplace(filename, small_buffer_size);
}
static std::unique_ptr<ReadBufferFromFile> openFileIfExists(const std::string & filename)
{
std::error_code ec;
if (std::filesystem::is_regular_file(filename, ec))
return std::make_unique<ReadBufferFromFile>(filename, small_buffer_size);
return {};
}
AsynchronousMetrics::AsynchronousMetrics(
ContextPtr global_context_,
@ -60,7 +75,6 @@ AsynchronousMetrics::AsynchronousMetrics(
{
#if defined(OS_LINUX)
openFileIfExists("/proc/meminfo", meminfo);
openFileIfExists("/proc/mounts", mounts);
openFileIfExists("/proc/loadavg", loadavg);
openFileIfExists("/proc/stat", proc_stat);
openFileIfExists("/proc/cpuinfo", cpuinfo);
@ -68,6 +82,17 @@ AsynchronousMetrics::AsynchronousMetrics(
openFileIfExists("/proc/sockstat", sockstat);
openFileIfExists("/proc/netstat", netstat);
openFileIfExists("/proc/sys/fs/file-nr", file_nr);
openFileIfExists("/proc/uptime", uptime);
size_t thermal_device_index = 0;
while (true)
{
std::unique_ptr<ReadBufferFromFile> file = openFileIfExists(fmt::format("/sys/class/thermal/thermal_zone{}/temp", thermal_device_index));
if (!file)
break;
thermal.emplace_back(std::move(file));
++thermal_device_index;
}
#endif
}
@ -211,6 +236,63 @@ static void saveAllArenasMetric(AsynchronousMetricValues & values,
}
#endif
#if defined(OS_LINUX)
void AsynchronousMetrics::ProcStatValuesCPU::read(ReadBuffer & in)
{
readText(user, in);
skipWhitespaceIfAny(in);
readText(nice, in);
skipWhitespaceIfAny(in);
readText(system, in);
skipWhitespaceIfAny(in);
readText(idle, in);
skipWhitespaceIfAny(in);
readText(iowait, in);
skipWhitespaceIfAny(in);
readText(irq, in);
skipWhitespaceIfAny(in);
readText(softirq, in);
skipWhitespaceIfAny(in);
readText(steal, in);
skipWhitespaceIfAny(in);
readText(guest, in);
skipWhitespaceIfAny(in);
readText(guest_nice, in);
skipToNextLineOrEOF(in);
}
AsynchronousMetrics::ProcStatValuesCPU
AsynchronousMetrics::ProcStatValuesCPU::operator-(const AsynchronousMetrics::ProcStatValuesCPU & other) const
{
ProcStatValuesCPU res{};
res.user = user - other.user;
res.nice = nice - other.nice;
res.system = system - other.system;
res.idle = idle - other.idle;
res.iowait = iowait - other.iowait;
res.irq = irq - other.irq;
res.softirq = softirq - other.softirq;
res.steal = steal - other.steal;
res.guest = guest - other.guest;
res.guest_nice = guest_nice - other.guest_nice;
return res;
}
AsynchronousMetrics::ProcStatValuesOther
AsynchronousMetrics::ProcStatValuesOther::operator-(const AsynchronousMetrics::ProcStatValuesOther & other) const
{
ProcStatValuesOther res{};
res.interrupts = interrupts - other.interrupts;
res.context_switches = context_switches - other.context_switches;
res.processes_created = processes_created - other.processes_created;
return res;
}
#endif
void AsynchronousMetrics::update()
{
AsynchronousMetricValues new_values;
@ -311,42 +393,234 @@ void AsynchronousMetrics::update()
new_values["OSThreadsTotal"] = threads_total;
}
if (uptime)
{
uptime->rewind();
Float64 uptime_seconds = 0;
readText(uptime_seconds, *uptime);
new_values["OSUptime"] = uptime_seconds;
}
if (proc_stat)
{
proc_stat->rewind();
int64_t hz = sysconf(_SC_CLK_TCK);
if (-1 == hz)
throwFromErrno("Cannot call 'sysconf' to obtain system HZ", ErrorCodes::CANNOT_SYSCONF);
double multiplier = 1.0 / hz / update_period.count();
ProcStatValuesOther current_other_values{};
while (!proc_stat->eof())
{
String name;
readStringUntilWhitespace(name, *proc_stat);
skipWhitespaceIfAny(*proc_stat);
if (name.starts_with("cpu"))
{
String cpu_num_str = name.substr(strlen("cpu"));
UInt64 cpu_num = 0;
if (!cpu_num_str.empty())
{
cpu_num = parse<UInt64>(cpu_num_str);
if (cpu_num > 1000000) /// Safety check, arbitrary large number, suitable for supercomputing applications.
throw Exception(ErrorCodes::CORRUPTED_DATA, "Too many CPUs (at least {}) in '/proc/stat' file", cpu_num);
if (proc_stat_values_per_cpu.size() <= cpu_num)
proc_stat_values_per_cpu.resize(cpu_num + 1);
}
ProcStatValuesCPU current_values{};
current_values.read(*proc_stat);
ProcStatValuesCPU & prev_values = !cpu_num_str.empty() ? proc_stat_values_per_cpu[cpu_num] : proc_stat_values_all_cpus;
if (!first_run)
{
ProcStatValuesCPU delta_values = current_values - prev_values;
String cpu_suffix;
if (!cpu_num_str.empty())
cpu_suffix = "CPU" + cpu_num_str;
new_values["OSUserTime" + cpu_suffix] = delta_values.user * multiplier;
new_values["OSNiceTime" + cpu_suffix] = delta_values.nice * multiplier;
new_values["OSSystemTime" + cpu_suffix] = delta_values.system * multiplier;
new_values["OSIdleTime" + cpu_suffix] = delta_values.idle * multiplier;
new_values["OSIOWaitTime" + cpu_suffix] = delta_values.iowait * multiplier;
new_values["OSIrqTime" + cpu_suffix] = delta_values.irq * multiplier;
new_values["OSSoftIrqTime" + cpu_suffix] = delta_values.softirq * multiplier;
new_values["OSStealTime" + cpu_suffix] = delta_values.steal * multiplier;
new_values["OSGuestTime" + cpu_suffix] = delta_values.guest * multiplier;
new_values["OSGuestNiceTime" + cpu_suffix] = delta_values.guest_nice * multiplier;
}
prev_values = current_values;
}
else if (name == "intr")
{
readText(current_other_values.interrupts, *proc_stat);
skipToNextLineOrEOF(*proc_stat);
}
else if (name == "ctxt")
{
readText(current_other_values.context_switches, *proc_stat);
skipToNextLineOrEOF(*proc_stat);
}
else if (name == "processes")
{
readText(current_other_values.processes_created, *proc_stat);
skipToNextLineOrEOF(*proc_stat);
}
else if (name == "procs_running")
{
UInt64 processes_running = 0;
readText(processes_running, *proc_stat);
skipToNextLineOrEOF(*proc_stat);
new_values["OSProcessesRunning"] = processes_running;
}
else if (name == "procs_blocked")
{
UInt64 processes_blocked = 0;
readText(processes_blocked, *proc_stat);
skipToNextLineOrEOF(*proc_stat);
new_values["OSProcessesBlocked"] = processes_blocked;
}
else
skipToNextLineOrEOF(*proc_stat);
}
if (!first_run)
{
ProcStatValuesOther delta_values = current_other_values - proc_stat_values_other;
new_values["OSInterrupts"] = delta_values.interrupts * multiplier;
new_values["OSContextSwitches"] = delta_values.context_switches * multiplier;
new_values["OSProcessesCreated"] = delta_values.processes_created * multiplier;
}
proc_stat_values_other = current_other_values;
}
if (meminfo)
{
meminfo->rewind();
uint64_t free_plus_cached_bytes = 0;
while (!meminfo->eof())
{
String name;
readStringUntilWhitespace(name, *meminfo);
skipWhitespaceIfAny(*meminfo);
uint64_t kb = 0;
readText(kb, *meminfo);
if (kb)
{
skipWhitespaceIfAny(*meminfo);
assertString("kB", *meminfo);
uint64_t bytes = kb * 1024;
if (name == "MemTotal:")
{
new_values["OSMemoryTotal"] = bytes;
}
else if (name == "MemFree:")
{
free_plus_cached_bytes += bytes;
new_values["OSMemoryFreeWithoutCached"] = bytes;
}
else if (name == "MemAvailable:")
{
new_values["OSMemoryAvailable"] = bytes;
}
else if (name == "Buffers:")
{
new_values["OSMemoryBuffers"] = bytes;
}
else if (name == "Cached:")
{
free_plus_cached_bytes += bytes;
new_values["OSMemoryCached"] = bytes;
}
else if (name == "SwapCached:")
{
new_values["OSMemorySwapCached"] = bytes;
}
}
skipToNextLineOrEOF(*meminfo);
}
new_values["OSMemoryFreePlusCached"] = free_plus_cached_bytes;
}
#endif
/// Process CPU usage according to OS
#if defined(OS_LINUX)
// Try to add processor frequencies, ignoring errors.
if (cpuinfo)
{
ProcessorStatisticsOS::Data data = proc_stat.get();
try
{
cpuinfo->rewind();
new_values["LoadAvg1"] = data.loadavg.avg1;
new_values["LoadAvg5"] = data.loadavg.avg5;
new_values["LoadAvg15"] = data.loadavg.avg15;
// We need the following lines:
// processor : 4
// cpu MHz : 4052.941
// They contain tabs and are interspersed with other info.
new_values["FreqMin"] = data.freq.min;
new_values["FreqMax"] = data.freq.max;
new_values["FreqAvg"] = data.freq.avg;
int core_id = 0;
while (!cpuinfo->eof())
{
std::string s;
// We don't have any backslash escape sequences in /proc/cpuinfo, so
// this function will read the line until EOL, which is exactly what
// we need.
readEscapedStringUntilEOL(s, *cpuinfo);
// It doesn't read the EOL itself.
++cpuinfo->position();
new_values["TimeLoadUser"] = data.stload.user_time;
new_values["TimeLoadNice"] = data.stload.nice_time;
new_values["TimeLoadSystem"] = data.stload.system_time;
new_values["TimeLoadIDLE"] = data.stload.idle_time;
new_values["TimeLoadIowait"] = data.stload.iowait_time;
new_values["TimeLoadSteal"] = data.stload.steal_time;
new_values["TimeLoadGuest"] = data.stload.guest_time;
new_values["TimeLoadGuestNice"] = data.stload.guest_nice_time;
if (s.rfind("processor", 0) == 0)
{
if (auto colon = s.find_first_of(':'))
{
core_id = std::stoi(s.substr(colon + 2));
}
}
else if (s.rfind("cpu MHz", 0) == 0)
{
if (auto colon = s.find_first_of(':'))
{
auto mhz = std::stod(s.substr(colon + 2));
new_values[fmt::format("CPUFrequencyMHz_{}", core_id)] = mhz;
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
new_values["Processes"] = data.stload.processes;
new_values["ProcessesRunning"] = data.stload.procs_running;
new_values["ProcessesBlocked"] = data.stload.procs_blocked;
if (file_nr)
{
file_nr->rewind();
uint64_t open_files = 0;
readText(open_files, *file_nr);
new_values["OSOpenFiles"] = open_files;
}
#endif
/// Process disk usage according to OS
#if defined(OS_LINUX)
{
@ -530,50 +804,6 @@ void AsynchronousMetrics::update()
saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
#endif
#if defined(OS_LINUX)
// Try to add processor frequencies, ignoring errors.
try
{
ReadBufferFromFile buf("/proc/cpuinfo", 32768 /* buf_size */);
// We need the following lines:
// processor : 4
// cpu MHz : 4052.941
// They contain tabs and are interspersed with other info.
int core_id = 0;
while (!buf.eof())
{
std::string s;
// We don't have any backslash escape sequences in /proc/cpuinfo, so
// this function will read the line until EOL, which is exactly what
// we need.
readEscapedStringUntilEOL(s, buf);
// It doesn't read the EOL itself.
++buf.position();
if (s.rfind("processor", 0) == 0)
{
if (auto colon = s.find_first_of(':'))
{
core_id = std::stoi(s.substr(colon + 2));
}
}
else if (s.rfind("cpu MHz", 0) == 0)
{
if (auto colon = s.find_first_of(':'))
{
auto mhz = std::stod(s.substr(colon + 2));
new_values[fmt::format("CPUFrequencyMHz_{}", core_id)] = mhz;
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
#endif
/// Add more metrics as you wish.
// Log the new metrics.
@ -582,6 +812,8 @@ void AsynchronousMetrics::update()
log->addValues(new_values);
}
first_run = false;
// Finally, update the current metrics.
std::lock_guard lock(mutex);
values = new_values;

View File

@ -12,6 +12,7 @@
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <optional>
#include <unordered_map>
@ -20,6 +21,7 @@ namespace DB
{
class ProtocolServerAdapter;
class ReadBuffer;
using AsynchronousMetricValue = double;
using AsynchronousMetricValues = std::unordered_map<std::string, AsynchronousMetricValue>;
@ -71,11 +73,14 @@ private:
bool quit {false};
AsynchronousMetricValues values;
/// Some values are incremental and we have to calculate the difference.
/// On first run we will only collect the values to subtract later.
bool first_run = true;
#if defined(OS_LINUX)
MemoryStatisticsOS memory_stat;
std::optional<ReadBufferFromFile> meminfo;
std::optional<ReadBufferFromFile> mounts;
std::optional<ReadBufferFromFile> loadavg;
std::optional<ReadBufferFromFile> proc_stat;
std::optional<ReadBufferFromFile> cpuinfo;
@ -83,6 +88,39 @@ private:
std::optional<ReadBufferFromFile> sockstat;
std::optional<ReadBufferFromFile> netstat;
std::optional<ReadBufferFromFile> file_nr;
std::optional<ReadBufferFromFile> uptime;
std::vector<std::unique_ptr<ReadBufferFromFile>> thermal;
struct ProcStatValuesCPU
{
uint64_t user;
uint64_t nice;
uint64_t system;
uint64_t idle;
uint64_t iowait;
uint64_t irq;
uint64_t softirq;
uint64_t steal;
uint64_t guest;
uint64_t guest_nice;
void read(ReadBuffer & in);
ProcStatValuesCPU operator-(const ProcStatValuesCPU & other) const;
};
struct ProcStatValuesOther
{
uint64_t interrupts;
uint64_t context_switches;
uint64_t processes_created;
ProcStatValuesOther operator-(const ProcStatValuesOther & other) const;
};
ProcStatValuesCPU proc_stat_values_all_cpus{};
ProcStatValuesOther proc_stat_values_other{};
std::vector<ProcStatValuesCPU> proc_stat_values_per_cpu;
#endif
std::unique_ptr<ThreadFromGlobalPool> thread;