diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index f4ceef2896a..8301ea656bf 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -557,6 +557,7 @@ M(587, CONCURRENT_ACCESS_NOT_SUPPORTED) \ M(588, DISTRIBUTED_BROKEN_BATCH_INFO) \ M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \ + M(590, CANNOT_SYSCONF) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 4e46cdc27f2..89196b5e25f 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -36,10 +36,16 @@ namespace CurrentMetrics namespace DB { +namespace ErrorCodes +{ + extern const int CORRUPTED_DATA; + extern const int CANNOT_SYSCONF; +} + +static constexpr size_t small_buffer_size = 4096; + static void openFileIfExists(const char * filename, std::optional & out) { - static constexpr size_t small_buffer_size = 4096; - /// Ignoring time of check is not time of use cases, as procfs/sysfs files are fairly persistent. std::error_code ec; @@ -47,6 +53,15 @@ static void openFileIfExists(const char * filename, std::optional openFileIfExists(const std::string & filename) +{ + std::error_code ec; + if (std::filesystem::is_regular_file(filename, ec)) + return std::make_unique(filename, small_buffer_size); + return {}; +} + + AsynchronousMetrics::AsynchronousMetrics( ContextPtr global_context_, @@ -60,7 +75,6 @@ AsynchronousMetrics::AsynchronousMetrics( { #if defined(OS_LINUX) openFileIfExists("/proc/meminfo", meminfo); - openFileIfExists("/proc/mounts", mounts); openFileIfExists("/proc/loadavg", loadavg); openFileIfExists("/proc/stat", proc_stat); openFileIfExists("/proc/cpuinfo", cpuinfo); @@ -68,6 +82,17 @@ AsynchronousMetrics::AsynchronousMetrics( openFileIfExists("/proc/sockstat", sockstat); openFileIfExists("/proc/netstat", netstat); openFileIfExists("/proc/sys/fs/file-nr", file_nr); + openFileIfExists("/proc/uptime", uptime); + + size_t thermal_device_index = 0; + while (true) + { + std::unique_ptr file = openFileIfExists(fmt::format("/sys/class/thermal/thermal_zone{}/temp", thermal_device_index)); + if (!file) + break; + thermal.emplace_back(std::move(file)); + ++thermal_device_index; + } #endif } @@ -211,6 +236,63 @@ static void saveAllArenasMetric(AsynchronousMetricValues & values, } #endif + +#if defined(OS_LINUX) + +void AsynchronousMetrics::ProcStatValuesCPU::read(ReadBuffer & in) +{ + readText(user, in); + skipWhitespaceIfAny(in); + readText(nice, in); + skipWhitespaceIfAny(in); + readText(system, in); + skipWhitespaceIfAny(in); + readText(idle, in); + skipWhitespaceIfAny(in); + readText(iowait, in); + skipWhitespaceIfAny(in); + readText(irq, in); + skipWhitespaceIfAny(in); + readText(softirq, in); + skipWhitespaceIfAny(in); + readText(steal, in); + skipWhitespaceIfAny(in); + readText(guest, in); + skipWhitespaceIfAny(in); + readText(guest_nice, in); + skipToNextLineOrEOF(in); +} + +AsynchronousMetrics::ProcStatValuesCPU +AsynchronousMetrics::ProcStatValuesCPU::operator-(const AsynchronousMetrics::ProcStatValuesCPU & other) const +{ + ProcStatValuesCPU res{}; + res.user = user - other.user; + res.nice = nice - other.nice; + res.system = system - other.system; + res.idle = idle - other.idle; + res.iowait = iowait - other.iowait; + res.irq = irq - other.irq; + res.softirq = softirq - other.softirq; + res.steal = steal - other.steal; + res.guest = guest - other.guest; + res.guest_nice = guest_nice - other.guest_nice; + return res; +} + +AsynchronousMetrics::ProcStatValuesOther +AsynchronousMetrics::ProcStatValuesOther::operator-(const AsynchronousMetrics::ProcStatValuesOther & other) const +{ + ProcStatValuesOther res{}; + res.interrupts = interrupts - other.interrupts; + res.context_switches = context_switches - other.context_switches; + res.processes_created = processes_created - other.processes_created; + return res; +} + +#endif + + void AsynchronousMetrics::update() { AsynchronousMetricValues new_values; @@ -311,42 +393,234 @@ void AsynchronousMetrics::update() new_values["OSThreadsTotal"] = threads_total; } + if (uptime) + { + uptime->rewind(); + + Float64 uptime_seconds = 0; + readText(uptime_seconds, *uptime); + + new_values["OSUptime"] = uptime_seconds; + } + + if (proc_stat) + { + proc_stat->rewind(); + + int64_t hz = sysconf(_SC_CLK_TCK); + if (-1 == hz) + throwFromErrno("Cannot call 'sysconf' to obtain system HZ", ErrorCodes::CANNOT_SYSCONF); + + double multiplier = 1.0 / hz / update_period.count(); + + ProcStatValuesOther current_other_values{}; + + while (!proc_stat->eof()) + { + String name; + readStringUntilWhitespace(name, *proc_stat); + skipWhitespaceIfAny(*proc_stat); + + if (name.starts_with("cpu")) + { + String cpu_num_str = name.substr(strlen("cpu")); + UInt64 cpu_num = 0; + if (!cpu_num_str.empty()) + { + cpu_num = parse(cpu_num_str); + + if (cpu_num > 1000000) /// Safety check, arbitrary large number, suitable for supercomputing applications. + throw Exception(ErrorCodes::CORRUPTED_DATA, "Too many CPUs (at least {}) in '/proc/stat' file", cpu_num); + + if (proc_stat_values_per_cpu.size() <= cpu_num) + proc_stat_values_per_cpu.resize(cpu_num + 1); + } + + ProcStatValuesCPU current_values{}; + current_values.read(*proc_stat); + + ProcStatValuesCPU & prev_values = !cpu_num_str.empty() ? proc_stat_values_per_cpu[cpu_num] : proc_stat_values_all_cpus; + + if (!first_run) + { + ProcStatValuesCPU delta_values = current_values - prev_values; + + String cpu_suffix; + if (!cpu_num_str.empty()) + cpu_suffix = "CPU" + cpu_num_str; + + new_values["OSUserTime" + cpu_suffix] = delta_values.user * multiplier; + new_values["OSNiceTime" + cpu_suffix] = delta_values.nice * multiplier; + new_values["OSSystemTime" + cpu_suffix] = delta_values.system * multiplier; + new_values["OSIdleTime" + cpu_suffix] = delta_values.idle * multiplier; + new_values["OSIOWaitTime" + cpu_suffix] = delta_values.iowait * multiplier; + new_values["OSIrqTime" + cpu_suffix] = delta_values.irq * multiplier; + new_values["OSSoftIrqTime" + cpu_suffix] = delta_values.softirq * multiplier; + new_values["OSStealTime" + cpu_suffix] = delta_values.steal * multiplier; + new_values["OSGuestTime" + cpu_suffix] = delta_values.guest * multiplier; + new_values["OSGuestNiceTime" + cpu_suffix] = delta_values.guest_nice * multiplier; + } + + prev_values = current_values; + } + else if (name == "intr") + { + readText(current_other_values.interrupts, *proc_stat); + skipToNextLineOrEOF(*proc_stat); + } + else if (name == "ctxt") + { + readText(current_other_values.context_switches, *proc_stat); + skipToNextLineOrEOF(*proc_stat); + } + else if (name == "processes") + { + readText(current_other_values.processes_created, *proc_stat); + skipToNextLineOrEOF(*proc_stat); + } + else if (name == "procs_running") + { + UInt64 processes_running = 0; + readText(processes_running, *proc_stat); + skipToNextLineOrEOF(*proc_stat); + new_values["OSProcessesRunning"] = processes_running; + } + else if (name == "procs_blocked") + { + UInt64 processes_blocked = 0; + readText(processes_blocked, *proc_stat); + skipToNextLineOrEOF(*proc_stat); + new_values["OSProcessesBlocked"] = processes_blocked; + } + else + skipToNextLineOrEOF(*proc_stat); + } + + if (!first_run) + { + ProcStatValuesOther delta_values = current_other_values - proc_stat_values_other; + + new_values["OSInterrupts"] = delta_values.interrupts * multiplier; + new_values["OSContextSwitches"] = delta_values.context_switches * multiplier; + new_values["OSProcessesCreated"] = delta_values.processes_created * multiplier; + } + + proc_stat_values_other = current_other_values; + } + if (meminfo) { meminfo->rewind(); + uint64_t free_plus_cached_bytes = 0; + while (!meminfo->eof()) + { + String name; + readStringUntilWhitespace(name, *meminfo); + skipWhitespaceIfAny(*meminfo); + + uint64_t kb = 0; + readText(kb, *meminfo); + if (kb) + { + skipWhitespaceIfAny(*meminfo); + assertString("kB", *meminfo); + + uint64_t bytes = kb * 1024; + + if (name == "MemTotal:") + { + new_values["OSMemoryTotal"] = bytes; + } + else if (name == "MemFree:") + { + free_plus_cached_bytes += bytes; + new_values["OSMemoryFreeWithoutCached"] = bytes; + } + else if (name == "MemAvailable:") + { + new_values["OSMemoryAvailable"] = bytes; + } + else if (name == "Buffers:") + { + new_values["OSMemoryBuffers"] = bytes; + } + else if (name == "Cached:") + { + free_plus_cached_bytes += bytes; + new_values["OSMemoryCached"] = bytes; + } + else if (name == "SwapCached:") + { + new_values["OSMemorySwapCached"] = bytes; + } + } + + skipToNextLineOrEOF(*meminfo); + } + + new_values["OSMemoryFreePlusCached"] = free_plus_cached_bytes; } -#endif - /// Process CPU usage according to OS -#if defined(OS_LINUX) + // Try to add processor frequencies, ignoring errors. + if (cpuinfo) { - ProcessorStatisticsOS::Data data = proc_stat.get(); + try + { + cpuinfo->rewind(); - new_values["LoadAvg1"] = data.loadavg.avg1; - new_values["LoadAvg5"] = data.loadavg.avg5; - new_values["LoadAvg15"] = data.loadavg.avg15; + // We need the following lines: + // processor : 4 + // cpu MHz : 4052.941 + // They contain tabs and are interspersed with other info. - new_values["FreqMin"] = data.freq.min; - new_values["FreqMax"] = data.freq.max; - new_values["FreqAvg"] = data.freq.avg; + int core_id = 0; + while (!cpuinfo->eof()) + { + std::string s; + // We don't have any backslash escape sequences in /proc/cpuinfo, so + // this function will read the line until EOL, which is exactly what + // we need. + readEscapedStringUntilEOL(s, *cpuinfo); + // It doesn't read the EOL itself. + ++cpuinfo->position(); - new_values["TimeLoadUser"] = data.stload.user_time; - new_values["TimeLoadNice"] = data.stload.nice_time; - new_values["TimeLoadSystem"] = data.stload.system_time; - new_values["TimeLoadIDLE"] = data.stload.idle_time; - new_values["TimeLoadIowait"] = data.stload.iowait_time; - new_values["TimeLoadSteal"] = data.stload.steal_time; - new_values["TimeLoadGuest"] = data.stload.guest_time; - new_values["TimeLoadGuestNice"] = data.stload.guest_nice_time; + if (s.rfind("processor", 0) == 0) + { + if (auto colon = s.find_first_of(':')) + { + core_id = std::stoi(s.substr(colon + 2)); + } + } + else if (s.rfind("cpu MHz", 0) == 0) + { + if (auto colon = s.find_first_of(':')) + { + auto mhz = std::stod(s.substr(colon + 2)); + new_values[fmt::format("CPUFrequencyMHz_{}", core_id)] = mhz; + } + } + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } - new_values["Processes"] = data.stload.processes; - new_values["ProcessesRunning"] = data.stload.procs_running; - new_values["ProcessesBlocked"] = data.stload.procs_blocked; + if (file_nr) + { + file_nr->rewind(); + + uint64_t open_files = 0; + readText(open_files, *file_nr); + new_values["OSOpenFiles"] = open_files; } #endif + + /// Process disk usage according to OS #if defined(OS_LINUX) { @@ -530,50 +804,6 @@ void AsynchronousMetrics::update() saveAllArenasMetric(new_values, "muzzy_purged"); #endif -#if defined(OS_LINUX) - // Try to add processor frequencies, ignoring errors. - try - { - ReadBufferFromFile buf("/proc/cpuinfo", 32768 /* buf_size */); - - // We need the following lines: - // processor : 4 - // cpu MHz : 4052.941 - // They contain tabs and are interspersed with other info. - int core_id = 0; - while (!buf.eof()) - { - std::string s; - // We don't have any backslash escape sequences in /proc/cpuinfo, so - // this function will read the line until EOL, which is exactly what - // we need. - readEscapedStringUntilEOL(s, buf); - // It doesn't read the EOL itself. - ++buf.position(); - - if (s.rfind("processor", 0) == 0) - { - if (auto colon = s.find_first_of(':')) - { - core_id = std::stoi(s.substr(colon + 2)); - } - } - else if (s.rfind("cpu MHz", 0) == 0) - { - if (auto colon = s.find_first_of(':')) - { - auto mhz = std::stod(s.substr(colon + 2)); - new_values[fmt::format("CPUFrequencyMHz_{}", core_id)] = mhz; - } - } - } - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -#endif - /// Add more metrics as you wish. // Log the new metrics. @@ -582,6 +812,8 @@ void AsynchronousMetrics::update() log->addValues(new_values); } + first_run = false; + // Finally, update the current metrics. std::lock_guard lock(mutex); values = new_values; diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 7bb281842dd..9f6e63f6ce6 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -20,6 +21,7 @@ namespace DB { class ProtocolServerAdapter; +class ReadBuffer; using AsynchronousMetricValue = double; using AsynchronousMetricValues = std::unordered_map; @@ -71,11 +73,14 @@ private: bool quit {false}; AsynchronousMetricValues values; + /// Some values are incremental and we have to calculate the difference. + /// On first run we will only collect the values to subtract later. + bool first_run = true; + #if defined(OS_LINUX) MemoryStatisticsOS memory_stat; std::optional meminfo; - std::optional mounts; std::optional loadavg; std::optional proc_stat; std::optional cpuinfo; @@ -83,6 +88,39 @@ private: std::optional sockstat; std::optional netstat; std::optional file_nr; + std::optional uptime; + std::vector> thermal; + + struct ProcStatValuesCPU + { + uint64_t user; + uint64_t nice; + uint64_t system; + uint64_t idle; + uint64_t iowait; + uint64_t irq; + uint64_t softirq; + uint64_t steal; + uint64_t guest; + uint64_t guest_nice; + + void read(ReadBuffer & in); + ProcStatValuesCPU operator-(const ProcStatValuesCPU & other) const; + }; + + struct ProcStatValuesOther + { + uint64_t interrupts; + uint64_t context_switches; + uint64_t processes_created; + + ProcStatValuesOther operator-(const ProcStatValuesOther & other) const; + }; + + ProcStatValuesCPU proc_stat_values_all_cpus{}; + ProcStatValuesOther proc_stat_values_other{}; + std::vector proc_stat_values_per_cpu; + #endif std::unique_ptr thread;