Compare commits

...

3 Commits

Author SHA1 Message Date
Julia Kartseva
cea6c807ea
Merge 79aa9ef665 into 40c7d5fd1a 2024-11-20 13:22:04 -08:00
Julia Kartseva
79aa9ef665 sort interactive metrics by recent update and name 2024-11-08 05:06:52 +00:00
Julia Kartseva
0cf896f5a0 fix parallel replicas display in interactive metrics 2024-11-08 04:40:13 +00:00
2 changed files with 61 additions and 36 deletions

View File

@ -159,11 +159,6 @@ std::string_view setColorForTimeBasedMetricsProgress(ProfileEvents::ValueType va
return colors[dist]; return colors[dist];
} }
std::string_view setColorForStaleMetrics()
{
return "\033[38;5;236m"; /// Dark Grey
}
std::string_view setColorForDocumentation() std::string_view setColorForDocumentation()
{ {
return "\033[38;5;236m"; /// Dark Grey return "\033[38;5;236m"; /// Dark Grey
@ -199,7 +194,6 @@ void ProgressTable::writeTable(
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
if (!show_table && toggle_enabled) if (!show_table && toggle_enabled)
{ {
if (written_first_block)
message << CLEAR_TO_END_OF_SCREEN; message << CLEAR_TO_END_OF_SCREEN;
message << HIDE_CURSOR; message << HIDE_CURSOR;
@ -233,9 +227,10 @@ void ProgressTable::writeTable(
for (auto & [name, per_host_info] : metrics) for (auto & [name, per_host_info] : metrics)
{ {
if (!per_host_info.isFresh(elapsed_sec))
continue;
message << "\n"; message << "\n";
if (per_host_info.isStale(elapsed_sec))
message << setColorForStaleMetrics();
writeWithWidth(message, name, column_event_name_width); writeWithWidth(message, name, column_event_name_width);
auto value = per_host_info.getSummaryValue(); auto value = per_host_info.getSummaryValue();
@ -275,7 +270,7 @@ void ProgressTable::writeTable(
message << CLEAR_TO_END_OF_LINE; message << CLEAR_TO_END_OF_LINE;
} }
message << moveUpNLines(tableSize()); message << moveUpNLines(getFreshMetricsCount(elapsed_sec));
message.next(); message.next();
} }
@ -319,7 +314,25 @@ void ProgressTable::updateTable(const Block & block)
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
const auto & event_name_to_event = getEventNameToEvent(); const auto & event_name_to_event = getEventNameToEvent();
std::vector<std::pair<std::string, size_t>> name_and_row_num_list;
for (size_t row_num = 0, rows = block.rows(); row_num < rows; ++row_num) for (size_t row_num = 0, rows = block.rows(); row_num < rows; ++row_num)
{
auto thread_id = array_thread_id[row_num];
if (thread_id != THREAD_GROUP_ID)
continue;
auto name = names.getDataAt(row_num).toString();
name_and_row_num_list.emplace_back(name, row_num);
}
/// Sort by metric name in reverse order, as the most recently updated entries are promoted to the front
/// of the metric's list.
std::sort(
name_and_row_num_list.begin(),
name_and_row_num_list.end(),
[](const auto & a, const auto & b) { return a.first != b.first ? a.first > b.first : a.second < b.second; });
for (const auto & [name, row_num] : name_and_row_num_list)
{ {
auto thread_id = array_thread_id[row_num]; auto thread_id = array_thread_id[row_num];
@ -329,8 +342,9 @@ void ProgressTable::updateTable(const Block & block)
if (thread_id != THREAD_GROUP_ID) if (thread_id != THREAD_GROUP_ID)
continue; continue;
chassert(name == names.getDataAt(row_num).toString());
auto value = array_values[row_num]; auto value = array_values[row_num];
auto name = names.getDataAt(row_num).toString();
auto host_name = host_names.getDataAt(row_num).toString(); auto host_name = host_names.getDataAt(row_num).toString();
auto type = static_cast<ProfileEvents::Type>(array_type[row_num]); auto type = static_cast<ProfileEvents::Type>(array_type[row_num]);
@ -342,24 +356,21 @@ void ProgressTable::updateTable(const Block & block)
if (value == 0) if (value == 0)
continue; continue;
auto it = metrics.find(name); auto it = metrics_iterators.find(name);
if (it == metrics_iterators.end())
{
metrics.emplace_front(name, MetricInfoPerHost{});
metrics_iterators.emplace(name, metrics.begin());
}
else
metrics.splice(metrics.begin(), metrics, it->second);
/// If the table has already been written, then do not add new metrics to avoid jitter. metrics.front().second.updateHostValue(host_name, type, value, time_now);
if (it == metrics.end() && written_first_block)
continue;
if (!written_first_block)
it = metrics.try_emplace(name).first;
it->second.updateHostValue(host_name, type, value, time_now);
max_event_name_width = std::max(max_event_name_width, name.size()); max_event_name_width = std::max(max_event_name_width, name.size());
} }
if (!written_first_block)
column_event_name_width = max_event_name_width + 1; column_event_name_width = max_event_name_width + 1;
written_first_block = true;
} }
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &) void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
@ -373,13 +384,22 @@ void ProgressTable::resetTable()
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
watch.restart(); watch.restart();
metrics.clear(); metrics.clear();
written_first_block = false; metrics_iterators.clear();
} }
size_t ProgressTable::tableSize() const size_t ProgressTable::getFreshMetricsCount(double time_now) const
{ {
auto count = std::count_if(
metrics.cbegin(),
metrics.cend(),
[&time_now](const auto & elem)
{
const auto & per_host_info = elem.second;
return per_host_info.isFresh(time_now);
});
/// Number of lines + header. /// Number of lines + header.
return metrics.empty() ? 0 : metrics.size() + 1; return count == 0 ? 0 : count + 1;
} }
size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const
@ -421,9 +441,11 @@ void ProgressTable::MetricInfo::updateValue(Int64 new_value, double new_time)
update_time = new_time; update_time = new_time;
} }
bool ProgressTable::MetricInfo::isStale(double now) const bool ProgressTable::MetricInfo::isFresh(double now) const
{ {
return update_time != 0 && now - update_time >= 5.0; constexpr double freshness_threshold = 3.0;
chassert(now >= update_time);
return update_time != 0 && now - update_time <= freshness_threshold;
} }
double ProgressTable::MetricInfo::calculateProgress(double time_now) const double ProgressTable::MetricInfo::calculateProgress(double time_now) const
@ -481,8 +503,8 @@ double ProgressTable::MetricInfoPerHost::getMaxProgress() const
return max_progress; return max_progress;
} }
bool ProgressTable::MetricInfoPerHost::isStale(double now) const bool ProgressTable::MetricInfoPerHost::isFresh(double now) const
{ {
return std::all_of(host_to_metric.cbegin(), host_to_metric.cend(), [&now](const auto & p) { return p.second.isStale(now); }); return std::any_of(host_to_metric.cbegin(), host_to_metric.cend(), [&now](const auto & p) { return p.second.isFresh(now); });
} }
} }

View File

@ -5,6 +5,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <list>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <ostream> #include <ostream>
@ -48,7 +49,7 @@ private:
void updateValue(Int64 new_value, double new_time); void updateValue(Int64 new_value, double new_time);
double calculateProgress(double time_now) const; double calculateProgress(double time_now) const;
double getValue() const; double getValue() const;
bool isStale(double now) const; bool isFresh(double now) const;
private: private:
const ProfileEvents::Type type; const ProfileEvents::Type type;
@ -80,7 +81,7 @@ private:
double getSummaryValue(); double getSummaryValue();
double getSummaryProgress(double time_now); double getSummaryProgress(double time_now);
double getMaxProgress() const; double getMaxProgress() const;
bool isStale(double now) const; bool isFresh(double now) const;
private: private:
std::unordered_map<HostName, MetricInfo> host_to_metric; std::unordered_map<HostName, MetricInfo> host_to_metric;
@ -88,13 +89,17 @@ private:
}; };
size_t tableSize() const; size_t tableSize() const;
size_t getFreshMetricsCount(double time_now) const;
size_t getColumnDocumentationWidth(size_t terminal_width) const; size_t getColumnDocumentationWidth(size_t terminal_width) const;
using MetricName = String; using MetricName = String;
using Metric = std::pair<MetricName, MetricInfoPerHost>;
/// The server periodically sends Block with profile events. /// The server periodically sends Block with profile events.
/// This information is stored here. /// This information is stored here.
std::map<MetricName, MetricInfoPerHost> metrics; std::list<Metric> metrics;
std::map<MetricName, std::list<Metric>::iterator> metrics_iterators;
/// It is possible concurrent access to the metrics. /// It is possible concurrent access to the metrics.
std::mutex mutex; std::mutex mutex;
@ -102,8 +107,6 @@ private:
/// Track query execution time on client. /// Track query execution time on client.
Stopwatch watch; Stopwatch watch;
bool written_first_block = false;
size_t column_event_name_width = 20; size_t column_event_name_width = 20;
static constexpr std::string_view COLUMN_EVENT_NAME = "Event name"; static constexpr std::string_view COLUMN_EVENT_NAME = "Event name";