Merge pull request #11588 from ClickHouse/aku/async-log

Add system.asynchronous_metric_log
This commit is contained in:
Alexander Kuzmenkov 2020-06-11 16:10:35 +03:00 committed by GitHub
commit e354bbc78c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 200 additions and 47 deletions

View File

@ -198,12 +198,14 @@ function get_profiles
clickhouse-client --port 9001 --query "select * from system.trace_log format TSVWithNamesAndTypes" > left-trace-log.tsv ||: &
clickhouse-client --port 9001 --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > left-addresses.tsv ||: &
clickhouse-client --port 9001 --query "select * from system.metric_log format TSVWithNamesAndTypes" > left-metric-log.tsv ||: &
clickhouse-client --port 9001 --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > left-async-metric-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.query_log where type = 2 format TSVWithNamesAndTypes" > right-query-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > right-query-thread-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.trace_log format TSVWithNamesAndTypes" > right-trace-log.tsv ||: &
clickhouse-client --port 9002 --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > right-addresses.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.metric_log format TSVWithNamesAndTypes" > right-metric-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > right-async-metric-log.tsv ||: &
wait

View File

@ -83,6 +83,10 @@ SELECT * FROM system.asynchronous_metrics LIMIT 10
- [system.events](#system_tables-events) — Contains a number of events that have occurred.
- [system.metric\_log](#system_tables-metric_log) — Contains a history of metrics values from tables `system.metrics` и `system.events`.
## system.asynchronous_metric_log {#system-tables-async-log}
Contains the historical values for `system.asynchronous_log` (see [system.asynchronous_metrics](#system_tables-asynchronous_metrics))
## system.clusters {#system-clusters}
Contains information about clusters available in the config file and the servers in them.

View File

@ -481,6 +481,20 @@
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
<!--
Asynchronous metric log contains values of metrics from
system.asynchronous_metrics.
-->
<asynchronous_metric_log>
<database>system</database>
<table>asynchronous_metric_log</table>
<!--
Asynchronous metrics are updated once a minute, so there is
no need to flush more often.
-->
<flush_interval_milliseconds>60000</flush_interval_milliseconds>
</asynchronous_metric_log>
<!-- Parameters for embedded dictionaries, used in Yandex.Metrica.
See https://clickhouse.yandex/docs/en/dicts/internal_dicts/
-->

View File

@ -0,0 +1,64 @@
#include <Interpreters/AsynchronousMetricLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/AsynchronousMetrics.h>
namespace DB
{
Block AsynchronousMetricLogElement::createBlock()
{
ColumnsWithTypeAndName columns;
columns.emplace_back(std::make_shared<DataTypeDate>(), "event_date");
columns.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time");
columns.emplace_back(std::make_shared<DataTypeString>(), "name");
columns.emplace_back(std::make_shared<DataTypeFloat64>(), "value");
return Block(columns);
}
void AsynchronousMetricLogElement::appendToBlock(MutableColumns & columns) const
{
size_t column_idx = 0;
columns[column_idx++]->insert(event_date);
columns[column_idx++]->insert(event_time);
columns[column_idx++]->insert(metric_name);
columns[column_idx++]->insert(value);
}
inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
void AsynchronousMetricLog::addValues(const AsynchronousMetricValues & values)
{
AsynchronousMetricLogElement element;
const auto now = std::chrono::system_clock::now();
element.event_time = time_in_seconds(now);
element.event_date = DateLUT::instance().toDayNum(element.event_time);
for (const auto & [key, value] : values)
{
element.metric_name = key;
element.value = value;
add(element);
}
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <vector>
#include <atomic>
#include <ctime>
namespace DB
{
typedef double AsynchronousMetricValue;
typedef std::unordered_map<std::string, AsynchronousMetricValue> AsynchronousMetricValues;
/** AsynchronousMetricLog is a log of metric values measured at regular time interval.
*/
struct AsynchronousMetricLogElement
{
UInt16 event_date;
time_t event_time;
std::string metric_name;
double value;
static std::string name() { return "AsynchronousMetricLog"; }
static Block createBlock();
void appendToBlock(MutableColumns & columns) const;
};
class AsynchronousMetricLog : public SystemLog<AsynchronousMetricLogElement>
{
public:
using SystemLog<AsynchronousMetricLogElement>::SystemLog;
void addValues(const AsynchronousMetricValues &);
};
}

View File

@ -1,4 +1,5 @@
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
@ -37,7 +38,7 @@ AsynchronousMetrics::~AsynchronousMetrics()
try
{
{
std::lock_guard lock{wait_mutex};
std::lock_guard lock{mutex};
quit = true;
}
@ -51,17 +52,10 @@ AsynchronousMetrics::~AsynchronousMetrics()
}
AsynchronousMetrics::Container AsynchronousMetrics::getValues() const
AsynchronousMetricValues AsynchronousMetrics::getValues() const
{
std::lock_guard lock{container_mutex};
return container;
}
void AsynchronousMetrics::set(const std::string & name, Value value)
{
std::lock_guard lock{container_mutex};
container[name] = value;
std::lock_guard lock{mutex};
return values;
}
@ -69,8 +63,6 @@ void AsynchronousMetrics::run()
{
setThreadName("AsyncMetrics");
std::unique_lock lock{wait_mutex};
/// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
const auto get_next_minute = []
{
@ -89,6 +81,7 @@ void AsynchronousMetrics::run()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::unique_lock lock{mutex};
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
}
@ -113,41 +106,43 @@ static void calculateMaxAndSum(Max & max, Sum & sum, T x)
void AsynchronousMetrics::update()
{
AsynchronousMetricValues new_values;
{
if (auto mark_cache = context.getMarkCache())
{
set("MarkCacheBytes", mark_cache->weight());
set("MarkCacheFiles", mark_cache->count());
new_values["MarkCacheBytes"] = mark_cache->weight();
new_values["MarkCacheFiles"] = mark_cache->count();
}
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
{
set("UncompressedCacheBytes", uncompressed_cache->weight());
set("UncompressedCacheCells", uncompressed_cache->count());
new_values["UncompressedCacheBytes"] = uncompressed_cache->weight();
new_values["UncompressedCacheCells"] = uncompressed_cache->count();
}
}
#if USE_EMBEDDED_COMPILER
{
if (auto compiled_expression_cache = context.getCompiledExpressionCache())
set("CompiledExpressionCacheCount", compiled_expression_cache->count());
new_values["CompiledExpressionCacheCount"] = compiled_expression_cache->count();
}
#endif
set("Uptime", context.getUptimeSeconds());
new_values["Uptime"] = context.getUptimeSeconds();
/// Process memory usage according to OS
#if defined(OS_LINUX)
{
MemoryStatisticsOS::Data data = memory_stat.get();
set("MemoryVirtual", data.virt);
set("MemoryResident", data.resident);
set("MemoryShared", data.shared);
set("MemoryCode", data.code);
set("MemoryDataAndStack", data.data_and_stack);
new_values["MemoryVirtual"] = data.virt;
new_values["MemoryResident"] = data.resident;
new_values["MemoryShared"] = data.shared;
new_values["MemoryCode"] = data.code;
new_values["MemoryDataAndStack"] = data.data_and_stack;
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
@ -228,21 +223,21 @@ void AsynchronousMetrics::update()
}
}
set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);
new_values["ReplicasMaxQueueSize"] = max_queue_size;
new_values["ReplicasMaxInsertsInQueue"] = max_inserts_in_queue;
new_values["ReplicasMaxMergesInQueue"] = max_merges_in_queue;
set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);
new_values["ReplicasSumQueueSize"] = sum_queue_size;
new_values["ReplicasSumInsertsInQueue"] = sum_inserts_in_queue;
new_values["ReplicasSumMergesInQueue"] = sum_merges_in_queue;
set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);
new_values["ReplicasMaxAbsoluteDelay"] = max_absolute_delay;
new_values["ReplicasMaxRelativeDelay"] = max_relative_delay;
set("MaxPartCountForPartition", max_part_count_for_partition);
new_values["MaxPartCountForPartition"] = max_part_count_for_partition;
set("NumberOfDatabases", number_of_databases);
set("NumberOfTables", total_number_of_tables);
new_values["NumberOfDatabases"] = number_of_databases;
new_values["NumberOfTables"] = total_number_of_tables;
}
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4
@ -265,7 +260,7 @@ void AsynchronousMetrics::update()
TYPE value{}; \
size_t size = sizeof(value); \
mallctl("stats." NAME, &value, &size, nullptr, 0); \
set("jemalloc." NAME, value); \
new_values["jemalloc." NAME] = value; \
} while (false);
FOR_EACH_METRIC(GET_METRIC)
@ -276,6 +271,16 @@ void AsynchronousMetrics::update()
#endif
/// Add more metrics as you wish.
// Log the new metrics.
if (auto log = context.getAsynchronousMetricLog())
{
log->addValues(new_values);
}
// Finally, update the current metrics.
std::lock_guard lock(mutex);
values = new_values;
}
}

View File

@ -14,6 +14,9 @@ namespace DB
class Context;
typedef double AsynchronousMetricValue;
typedef std::unordered_map<std::string, AsynchronousMetricValue> AsynchronousMetricValues;
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics,
@ -29,21 +32,17 @@ public:
~AsynchronousMetrics();
using Value = double;
using Container = std::unordered_map<std::string, Value>;
/// Returns copy of all values.
Container getValues() const;
AsynchronousMetricValues getValues() const;
private:
Context & context;
bool quit {false};
std::mutex wait_mutex;
mutable std::mutex mutex;
std::condition_variable wait_cond;
Container container;
mutable std::mutex container_mutex;
bool quit {false};
AsynchronousMetricValues values;
#if defined(OS_LINUX)
MemoryStatisticsOS memory_stat;
@ -53,8 +52,6 @@ private:
void run();
void update();
void set(const std::string & name, Value value);
};
}

View File

@ -1678,6 +1678,17 @@ std::shared_ptr<MetricLog> Context::getMetricLog()
}
std::shared_ptr<AsynchronousMetricLog> Context::getAsynchronousMetricLog()
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->asynchronous_metric_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

View File

@ -80,6 +80,7 @@ class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
class AsynchronousMetricLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -526,6 +527,7 @@ public:
std::shared_ptr<TraceLog> getTraceLog();
std::shared_ptr<TextLog> getTextLog();
std::shared_ptr<MetricLog> getMetricLog();
std::shared_ptr<AsynchronousMetricLog> getAsynchronousMetricLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.

View File

@ -20,6 +20,7 @@
#include <Interpreters/TraceLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
@ -306,7 +307,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); }
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); },
[&] () { if (auto asynchronous_metric_log = context.getAsynchronousMetricLog()) asynchronous_metric_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -5,6 +5,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
@ -75,6 +76,9 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -88,6 +92,9 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
try
{

View File

@ -69,6 +69,7 @@ class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
class AsynchronousMetricLog;
class ISystemLog
@ -99,6 +100,8 @@ struct SystemLogs
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
std::vector<ISystemLog *> logs;
};

View File

@ -105,6 +105,7 @@ SRCS(
MarkTableIdentifiersVisitor.cpp
MergeJoin.cpp
MetricLog.cpp
AsynchronousMetricLog.cpp
MutationsInterpreter.cpp
NullableUtils.cpp
OptimizeIfChains.cpp