sort interactive metrics by recent update and name

This commit is contained in:
Julia Kartseva 2024-11-08 03:59:42 +00:00
parent 0cf896f5a0
commit 79aa9ef665
2 changed files with 59 additions and 22 deletions

View File

@ -158,11 +158,6 @@ std::string_view setColorForTimeBasedMetricsProgress(ProfileEvents::ValueType va
return colors[dist]; return colors[dist];
} }
std::string_view setColorForStaleMetrics()
{
return "\033[38;5;236m"; /// Dark Grey
}
std::string_view setColorForDocumentation() std::string_view setColorForDocumentation()
{ {
return "\033[38;5;236m"; /// Dark Grey return "\033[38;5;236m"; /// Dark Grey
@ -230,9 +225,10 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
for (auto & [name, per_host_info] : metrics) for (auto & [name, per_host_info] : metrics)
{ {
if (!per_host_info.isFresh(elapsed_sec))
continue;
message << "\n"; message << "\n";
if (per_host_info.isStale(elapsed_sec))
message << setColorForStaleMetrics();
writeWithWidth(message, name, column_event_name_width); writeWithWidth(message, name, column_event_name_width);
auto value = per_host_info.getSummaryValue(); auto value = per_host_info.getSummaryValue();
@ -272,7 +268,7 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
message << CLEAR_TO_END_OF_LINE; message << CLEAR_TO_END_OF_LINE;
} }
message << moveUpNLines(tableSize()); message << moveUpNLines(getFreshMetricsCount(elapsed_sec));
message.next(); message.next();
} }
@ -316,7 +312,25 @@ void ProgressTable::updateTable(const Block & block)
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
const auto & event_name_to_event = getEventNameToEvent(); const auto & event_name_to_event = getEventNameToEvent();
std::vector<std::pair<std::string, size_t>> name_and_row_num_list;
for (size_t row_num = 0, rows = block.rows(); row_num < rows; ++row_num) for (size_t row_num = 0, rows = block.rows(); row_num < rows; ++row_num)
{
auto thread_id = array_thread_id[row_num];
if (thread_id != THREAD_GROUP_ID)
continue;
auto name = names.getDataAt(row_num).toString();
name_and_row_num_list.emplace_back(name, row_num);
}
/// Sort by metric name in reverse order, as the most recently updated entries are promoted to the front
/// of the metric's list.
std::sort(
name_and_row_num_list.begin(),
name_and_row_num_list.end(),
[](const auto & a, const auto & b) { return a.first != b.first ? a.first > b.first : a.second < b.second; });
for (const auto & [name, row_num] : name_and_row_num_list)
{ {
auto thread_id = array_thread_id[row_num]; auto thread_id = array_thread_id[row_num];
@ -326,8 +340,9 @@ void ProgressTable::updateTable(const Block & block)
if (thread_id != THREAD_GROUP_ID) if (thread_id != THREAD_GROUP_ID)
continue; continue;
chassert(name == names.getDataAt(row_num).toString());
auto value = array_values[row_num]; auto value = array_values[row_num];
auto name = names.getDataAt(row_num).toString();
auto host_name = host_names.getDataAt(row_num).toString(); auto host_name = host_names.getDataAt(row_num).toString();
auto type = static_cast<ProfileEvents::Type>(array_type[row_num]); auto type = static_cast<ProfileEvents::Type>(array_type[row_num]);
@ -339,11 +354,16 @@ void ProgressTable::updateTable(const Block & block)
if (value == 0) if (value == 0)
continue; continue;
auto it = metrics.find(name); auto it = metrics_iterators.find(name);
if (it == metrics.end()) if (it == metrics_iterators.end())
it = metrics.try_emplace(name).first; {
metrics.emplace_front(name, MetricInfoPerHost{});
metrics_iterators.emplace(name, metrics.begin());
}
else
metrics.splice(metrics.begin(), metrics, it->second);
it->second.updateHostValue(host_name, type, value, time_now); metrics.front().second.updateHostValue(host_name, type, value, time_now);
max_event_name_width = std::max(max_event_name_width, name.size()); max_event_name_width = std::max(max_event_name_width, name.size());
} }
@ -362,12 +382,22 @@ void ProgressTable::resetTable()
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
watch.restart(); watch.restart();
metrics.clear(); metrics.clear();
metrics_iterators.clear();
} }
size_t ProgressTable::tableSize() const size_t ProgressTable::getFreshMetricsCount(double time_now) const
{ {
auto count = std::count_if(
metrics.cbegin(),
metrics.cend(),
[&time_now](const auto & elem)
{
const auto & per_host_info = elem.second;
return per_host_info.isFresh(time_now);
});
/// Number of lines + header. /// Number of lines + header.
return metrics.empty() ? 0 : metrics.size() + 1; return count == 0 ? 0 : count + 1;
} }
size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const
@ -409,9 +439,11 @@ void ProgressTable::MetricInfo::updateValue(Int64 new_value, double new_time)
update_time = new_time; update_time = new_time;
} }
bool ProgressTable::MetricInfo::isStale(double now) const bool ProgressTable::MetricInfo::isFresh(double now) const
{ {
return update_time != 0 && now - update_time >= 5.0; constexpr double freshness_threshold = 3.0;
chassert(now >= update_time);
return update_time != 0 && now - update_time <= freshness_threshold;
} }
double ProgressTable::MetricInfo::calculateProgress(double time_now) const double ProgressTable::MetricInfo::calculateProgress(double time_now) const
@ -469,8 +501,8 @@ double ProgressTable::MetricInfoPerHost::getMaxProgress() const
return max_progress; return max_progress;
} }
bool ProgressTable::MetricInfoPerHost::isStale(double now) const bool ProgressTable::MetricInfoPerHost::isFresh(double now) const
{ {
return std::all_of(host_to_metric.cbegin(), host_to_metric.cend(), [&now](const auto & p) { return p.second.isStale(now); }); return std::any_of(host_to_metric.cbegin(), host_to_metric.cend(), [&now](const auto & p) { return p.second.isFresh(now); });
} }
} }

View File

@ -5,6 +5,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <list>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <ostream> #include <ostream>
@ -47,7 +48,7 @@ private:
void updateValue(Int64 new_value, double new_time); void updateValue(Int64 new_value, double new_time);
double calculateProgress(double time_now) const; double calculateProgress(double time_now) const;
double getValue() const; double getValue() const;
bool isStale(double now) const; bool isFresh(double now) const;
private: private:
const ProfileEvents::Type type; const ProfileEvents::Type type;
@ -79,7 +80,7 @@ private:
double getSummaryValue(); double getSummaryValue();
double getSummaryProgress(double time_now); double getSummaryProgress(double time_now);
double getMaxProgress() const; double getMaxProgress() const;
bool isStale(double now) const; bool isFresh(double now) const;
private: private:
std::unordered_map<HostName, MetricInfo> host_to_metric; std::unordered_map<HostName, MetricInfo> host_to_metric;
@ -87,13 +88,17 @@ private:
}; };
size_t tableSize() const; size_t tableSize() const;
size_t getFreshMetricsCount(double time_now) const;
size_t getColumnDocumentationWidth(size_t terminal_width) const; size_t getColumnDocumentationWidth(size_t terminal_width) const;
using MetricName = String; using MetricName = String;
using Metric = std::pair<MetricName, MetricInfoPerHost>;
/// The server periodically sends Block with profile events. /// The server periodically sends Block with profile events.
/// This information is stored here. /// This information is stored here.
std::map<MetricName, MetricInfoPerHost> metrics; std::list<Metric> metrics;
std::map<MetricName, std::list<Metric>::iterator> metrics_iterators;
/// It is possible concurrent access to the metrics. /// It is possible concurrent access to the metrics.
std::mutex mutex; std::mutex mutex;