Rectification of MetricLog

This commit is contained in:
Alexey Milovidov 2019-08-18 03:04:58 +03:00
parent e2687fc2d5
commit 16495bab5b
2 changed files with 50 additions and 23 deletions

View File

@ -1,9 +1,9 @@
#include <Interpreters/MetricLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
namespace DB
{
@ -11,11 +11,10 @@ Block MetricLogElement::createBlock()
{
ColumnsWithTypeAndName columns_with_type_and_name;
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDate>(), "event_date");
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time");
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), "milliseconds");
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDate>(), "event_date");
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time");
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), "milliseconds");
//ProfileEvents
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
std::string name;
@ -24,7 +23,6 @@ Block MetricLogElement::createBlock()
columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), std::move(name));
}
//CurrentMetrics
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
std::string name;
@ -36,31 +34,25 @@ Block MetricLogElement::createBlock()
return Block(columns_with_type_and_name);
}
void MetricLogElement::appendToBlock(Block & block) const
{
MutableColumns columns = block.mutateColumns();
size_t iter = 0;
size_t column_idx = 0;
columns[iter++]->insert(DateLUT::instance().toDayNum(event_time));
columns[iter++]->insert(event_time);
columns[iter++]->insert(milliseconds);
columns[column_idx++]->insert(DateLUT::instance().toDayNum(event_time));
columns[column_idx++]->insert(event_time);
columns[column_idx++]->insert(milliseconds);
//ProfileEvents
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const UInt64 value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
columns[iter++]->insert(value);
}
columns[column_idx++]->insert(profile_events[i]);
//CurrentMetrics
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
const UInt64 value = CurrentMetrics::values[i];
columns[iter++]->insert(value);
}
columns[column_idx++]->insert(current_metrics[i]);
}
void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_)
{
collect_interval_milliseconds = collect_interval_milliseconds_;
@ -68,6 +60,7 @@ void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_)
metric_flush_thread = ThreadFromGlobalPool([this] { metricThreadFunction(); });
}
void MetricLog::stopCollectMetric()
{
bool old_val = false;
@ -76,28 +69,47 @@ void MetricLog::stopCollectMetric()
metric_flush_thread.join();
}
inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
void MetricLog::metricThreadFunction()
{
auto desired_timepoint = std::chrono::system_clock::now();
prev_profile_events.resize(ProfileEvents::end());
while (!is_shutdown_metric_thread)
{
try
{
MetricLogElement elem;
const auto current_time = std::chrono::system_clock::now();
MetricLogElement elem;
elem.event_time = std::chrono::system_clock::to_time_t(current_time);
elem.milliseconds = time_in_milliseconds(current_time) - time_in_seconds(current_time) * 1000;
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
UInt64 & old_value = prev_profile_events[i];
elem.profile_events[i] = new_value - old_value;
old_value = new_value;
}
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
{
elem.current_metrics[i] = CurrentMetrics::values[i];
}
this->add(elem);
/// We will record current time into table but align it to regular time intervals to avoid time drift.

View File

@ -1,22 +1,34 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <vector>
#include <atomic>
#include <ctime>
namespace DB
{
using Poco::Message;
/** MetricLog is a log of metric values measured at regular time interval.
*/
struct MetricLogElement
{
time_t event_time{};
UInt64 milliseconds{};
std::vector<ProfileEvents::Count> profile_events;
std::vector<CurrentMetrics::Metric> current_metrics;
static std::string name() { return "MetricLog"; }
static Block createBlock();
void appendToBlock(Block & block) const;
};
class MetricLog : public SystemLog<MetricLogElement>
{
using SystemLog<MetricLogElement>::SystemLog;
@ -34,6 +46,9 @@ private:
ThreadFromGlobalPool metric_flush_thread;
size_t collect_interval_milliseconds;
std::atomic<bool> is_shutdown_metric_thread{false};
/// For differentiation of ProfileEvents counters.
std::vector<ProfileEvents::Count> prev_profile_events;
};
}