diff --git a/docker/test/performance-comparison/compare.sh b/docker/test/performance-comparison/compare.sh
index f7986689020..a2760907cb3 100755
--- a/docker/test/performance-comparison/compare.sh
+++ b/docker/test/performance-comparison/compare.sh
@@ -198,12 +198,14 @@ function get_profiles
clickhouse-client --port 9001 --query "select * from system.trace_log format TSVWithNamesAndTypes" > left-trace-log.tsv ||: &
clickhouse-client --port 9001 --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > left-addresses.tsv ||: &
clickhouse-client --port 9001 --query "select * from system.metric_log format TSVWithNamesAndTypes" > left-metric-log.tsv ||: &
+ clickhouse-client --port 9001 --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > left-async-metric-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.query_log where type = 2 format TSVWithNamesAndTypes" > right-query-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.query_thread_log format TSVWithNamesAndTypes" > right-query-thread-log.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.trace_log format TSVWithNamesAndTypes" > right-trace-log.tsv ||: &
clickhouse-client --port 9002 --query "select arrayJoin(trace) addr, concat(splitByChar('/', addressToLine(addr))[-1], '#', demangle(addressToSymbol(addr)) ) name from system.trace_log group by addr format TSVWithNamesAndTypes" > right-addresses.tsv ||: &
clickhouse-client --port 9002 --query "select * from system.metric_log format TSVWithNamesAndTypes" > right-metric-log.tsv ||: &
+ clickhouse-client --port 9002 --query "select * from system.asynchronous_metric_log format TSVWithNamesAndTypes" > right-async-metric-log.tsv ||: &
wait
diff --git a/docs/en/operations/system-tables.md b/docs/en/operations/system-tables.md
index 7b76f737824..28f448b632c 100644
--- a/docs/en/operations/system-tables.md
+++ b/docs/en/operations/system-tables.md
@@ -83,6 +83,10 @@ SELECT * FROM system.asynchronous_metrics LIMIT 10
- [system.events](#system_tables-events) — Contains a number of events that have occurred.
- [system.metric\_log](#system_tables-metric_log) — Contains a history of metrics values from tables `system.metrics` и `system.events`.
+## system.asynchronous_metric_log {#system-tables-async-log}
+
+Contains the historical values for `system.asynchronous_log` (see [system.asynchronous_metrics](#system_tables-asynchronous_metrics))
+
## system.clusters {#system-clusters}
Contains information about clusters available in the config file and the servers in them.
diff --git a/programs/server/config.xml b/programs/server/config.xml
index ba870d8a8ea..0ceba85593a 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -481,6 +481,20 @@
1000
+
+
+ system
+
+
+ 60000
+
+
diff --git a/src/Interpreters/AsynchronousMetricLog.cpp b/src/Interpreters/AsynchronousMetricLog.cpp
new file mode 100644
index 00000000000..e4415773655
--- /dev/null
+++ b/src/Interpreters/AsynchronousMetricLog.cpp
@@ -0,0 +1,64 @@
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+Block AsynchronousMetricLogElement::createBlock()
+{
+ ColumnsWithTypeAndName columns;
+
+ columns.emplace_back(std::make_shared(), "event_date");
+ columns.emplace_back(std::make_shared(), "event_time");
+ columns.emplace_back(std::make_shared(), "name");
+ columns.emplace_back(std::make_shared(), "value");
+
+ return Block(columns);
+}
+
+
+void AsynchronousMetricLogElement::appendToBlock(MutableColumns & columns) const
+{
+ size_t column_idx = 0;
+
+ columns[column_idx++]->insert(event_date);
+ columns[column_idx++]->insert(event_time);
+ columns[column_idx++]->insert(metric_name);
+ columns[column_idx++]->insert(value);
+}
+
+
+inline UInt64 time_in_milliseconds(std::chrono::time_point timepoint)
+{
+ return std::chrono::duration_cast(timepoint.time_since_epoch()).count();
+}
+
+
+inline UInt64 time_in_seconds(std::chrono::time_point timepoint)
+{
+ return std::chrono::duration_cast(timepoint.time_since_epoch()).count();
+}
+
+void AsynchronousMetricLog::addValues(const AsynchronousMetricValues & values)
+{
+ AsynchronousMetricLogElement element;
+
+ const auto now = std::chrono::system_clock::now();
+ element.event_time = time_in_seconds(now);
+ element.event_date = DateLUT::instance().toDayNum(element.event_time);
+
+ for (const auto & [key, value] : values)
+ {
+ element.metric_name = key;
+ element.value = value;
+
+ add(element);
+ }
+}
+
+}
diff --git a/src/Interpreters/AsynchronousMetricLog.h b/src/Interpreters/AsynchronousMetricLog.h
new file mode 100644
index 00000000000..b7d6aab95b6
--- /dev/null
+++ b/src/Interpreters/AsynchronousMetricLog.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+typedef double AsynchronousMetricValue;
+typedef std::unordered_map AsynchronousMetricValues;
+
+/** AsynchronousMetricLog is a log of metric values measured at regular time interval.
+ */
+
+struct AsynchronousMetricLogElement
+{
+ UInt16 event_date;
+ time_t event_time;
+ std::string metric_name;
+ double value;
+
+ static std::string name() { return "AsynchronousMetricLog"; }
+ static Block createBlock();
+ void appendToBlock(MutableColumns & columns) const;
+};
+
+class AsynchronousMetricLog : public SystemLog
+{
+public:
+ using SystemLog::SystemLog;
+
+ void addValues(const AsynchronousMetricValues &);
+};
+
+}
diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp
index 09622302893..6cd8fafa2a8 100644
--- a/src/Interpreters/AsynchronousMetrics.cpp
+++ b/src/Interpreters/AsynchronousMetrics.cpp
@@ -1,4 +1,5 @@
#include
+#include
#include
#include
#include
@@ -37,7 +38,7 @@ AsynchronousMetrics::~AsynchronousMetrics()
try
{
{
- std::lock_guard lock{wait_mutex};
+ std::lock_guard lock{mutex};
quit = true;
}
@@ -51,17 +52,10 @@ AsynchronousMetrics::~AsynchronousMetrics()
}
-AsynchronousMetrics::Container AsynchronousMetrics::getValues() const
+AsynchronousMetricValues AsynchronousMetrics::getValues() const
{
- std::lock_guard lock{container_mutex};
- return container;
-}
-
-
-void AsynchronousMetrics::set(const std::string & name, Value value)
-{
- std::lock_guard lock{container_mutex};
- container[name] = value;
+ std::lock_guard lock{mutex};
+ return values;
}
@@ -69,8 +63,6 @@ void AsynchronousMetrics::run()
{
setThreadName("AsyncMetrics");
- std::unique_lock lock{wait_mutex};
-
/// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
const auto get_next_minute = []
{
@@ -89,6 +81,7 @@ void AsynchronousMetrics::run()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
+ std::unique_lock lock{mutex};
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
}
@@ -113,41 +106,43 @@ static void calculateMaxAndSum(Max & max, Sum & sum, T x)
void AsynchronousMetrics::update()
{
+ AsynchronousMetricValues new_values;
+
{
if (auto mark_cache = context.getMarkCache())
{
- set("MarkCacheBytes", mark_cache->weight());
- set("MarkCacheFiles", mark_cache->count());
+ new_values["MarkCacheBytes"] = mark_cache->weight();
+ new_values["MarkCacheFiles"] = mark_cache->count();
}
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
{
- set("UncompressedCacheBytes", uncompressed_cache->weight());
- set("UncompressedCacheCells", uncompressed_cache->count());
+ new_values["UncompressedCacheBytes"] = uncompressed_cache->weight();
+ new_values["UncompressedCacheCells"] = uncompressed_cache->count();
}
}
#if USE_EMBEDDED_COMPILER
{
if (auto compiled_expression_cache = context.getCompiledExpressionCache())
- set("CompiledExpressionCacheCount", compiled_expression_cache->count());
+ new_values["CompiledExpressionCacheCount"] = compiled_expression_cache->count();
}
#endif
- set("Uptime", context.getUptimeSeconds());
+ new_values["Uptime"] = context.getUptimeSeconds();
/// Process memory usage according to OS
#if defined(OS_LINUX)
{
MemoryStatisticsOS::Data data = memory_stat.get();
- set("MemoryVirtual", data.virt);
- set("MemoryResident", data.resident);
- set("MemoryShared", data.shared);
- set("MemoryCode", data.code);
- set("MemoryDataAndStack", data.data_and_stack);
+ new_values["MemoryVirtual"] = data.virt;
+ new_values["MemoryResident"] = data.resident;
+ new_values["MemoryShared"] = data.shared;
+ new_values["MemoryCode"] = data.code;
+ new_values["MemoryDataAndStack"] = data.data_and_stack;
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
@@ -228,21 +223,21 @@ void AsynchronousMetrics::update()
}
}
- set("ReplicasMaxQueueSize", max_queue_size);
- set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
- set("ReplicasMaxMergesInQueue", max_merges_in_queue);
+ new_values["ReplicasMaxQueueSize"] = max_queue_size;
+ new_values["ReplicasMaxInsertsInQueue"] = max_inserts_in_queue;
+ new_values["ReplicasMaxMergesInQueue"] = max_merges_in_queue;
- set("ReplicasSumQueueSize", sum_queue_size);
- set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
- set("ReplicasSumMergesInQueue", sum_merges_in_queue);
+ new_values["ReplicasSumQueueSize"] = sum_queue_size;
+ new_values["ReplicasSumInsertsInQueue"] = sum_inserts_in_queue;
+ new_values["ReplicasSumMergesInQueue"] = sum_merges_in_queue;
- set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
- set("ReplicasMaxRelativeDelay", max_relative_delay);
+ new_values["ReplicasMaxAbsoluteDelay"] = max_absolute_delay;
+ new_values["ReplicasMaxRelativeDelay"] = max_relative_delay;
- set("MaxPartCountForPartition", max_part_count_for_partition);
+ new_values["MaxPartCountForPartition"] = max_part_count_for_partition;
- set("NumberOfDatabases", number_of_databases);
- set("NumberOfTables", total_number_of_tables);
+ new_values["NumberOfDatabases"] = number_of_databases;
+ new_values["NumberOfTables"] = total_number_of_tables;
}
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4
@@ -265,7 +260,7 @@ void AsynchronousMetrics::update()
TYPE value{}; \
size_t size = sizeof(value); \
mallctl("stats." NAME, &value, &size, nullptr, 0); \
- set("jemalloc." NAME, value); \
+ new_values["jemalloc." NAME] = value; \
} while (false);
FOR_EACH_METRIC(GET_METRIC)
@@ -276,6 +271,16 @@ void AsynchronousMetrics::update()
#endif
/// Add more metrics as you wish.
+
+ // Log the new metrics.
+ if (auto log = context.getAsynchronousMetricLog())
+ {
+ log->addValues(new_values);
+ }
+
+ // Finally, update the current metrics.
+ std::lock_guard lock(mutex);
+ values = new_values;
}
}
diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h
index ce6c0aae552..6817f545c8f 100644
--- a/src/Interpreters/AsynchronousMetrics.h
+++ b/src/Interpreters/AsynchronousMetrics.h
@@ -14,6 +14,9 @@ namespace DB
class Context;
+typedef double AsynchronousMetricValue;
+typedef std::unordered_map AsynchronousMetricValues;
+
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics,
@@ -29,21 +32,17 @@ public:
~AsynchronousMetrics();
- using Value = double;
- using Container = std::unordered_map;
/// Returns copy of all values.
- Container getValues() const;
+ AsynchronousMetricValues getValues() const;
private:
Context & context;
- bool quit {false};
- std::mutex wait_mutex;
+ mutable std::mutex mutex;
std::condition_variable wait_cond;
-
- Container container;
- mutable std::mutex container_mutex;
+ bool quit {false};
+ AsynchronousMetricValues values;
#if defined(OS_LINUX)
MemoryStatisticsOS memory_stat;
@@ -53,8 +52,6 @@ private:
void run();
void update();
-
- void set(const std::string & name, Value value);
};
}
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index 7b636b84c68..1431f3fd62c 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -1678,6 +1678,17 @@ std::shared_ptr Context::getMetricLog()
}
+std::shared_ptr Context::getAsynchronousMetricLog()
+{
+ auto lock = getLock();
+
+ if (!shared->system_logs)
+ return {};
+
+ return shared->system_logs->asynchronous_metric_log;
+}
+
+
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();
diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h
index 1d46049fb92..5a4e959229f 100644
--- a/src/Interpreters/Context.h
+++ b/src/Interpreters/Context.h
@@ -80,6 +80,7 @@ class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
+class AsynchronousMetricLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@@ -526,6 +527,7 @@ public:
std::shared_ptr getTraceLog();
std::shared_ptr getTextLog();
std::shared_ptr getMetricLog();
+ std::shared_ptr getAsynchronousMetricLog();
/// Returns an object used to log opertaions with parts if it possible.
/// Provide table name to make required cheks.
diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp
index 9ebdb155643..f35bafbe25a 100644
--- a/src/Interpreters/InterpreterSystemQuery.cpp
+++ b/src/Interpreters/InterpreterSystemQuery.cpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -306,7 +307,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(); },
- [&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); }
+ [&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); },
+ [&] () { if (auto asynchronous_metric_log = context.getAsynchronousMetricLog()) asynchronous_metric_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:
diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp
index 9ce2f7a4d0e..d79edde7052 100644
--- a/src/Interpreters/SystemLog.cpp
+++ b/src/Interpreters/SystemLog.cpp
@@ -5,6 +5,7 @@
#include
#include
#include
+#include
#include
#include
@@ -75,6 +76,9 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
trace_log = createSystemLog(global_context, "system", "trace_log", config, "trace_log");
text_log = createSystemLog(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog(global_context, "system", "metric_log", config, "metric_log");
+ asynchronous_metric_log = createSystemLog(
+ global_context, "system", "asynchronous_metric_log", config,
+ "asynchronous_metric_log");
if (query_log)
logs.emplace_back(query_log.get());
@@ -88,6 +92,9 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
+ if (asynchronous_metric_log)
+ logs.emplace_back(asynchronous_metric_log.get());
+
try
{
diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h
index dd2f815ce92..e49ce574478 100644
--- a/src/Interpreters/SystemLog.h
+++ b/src/Interpreters/SystemLog.h
@@ -69,6 +69,7 @@ class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
+class AsynchronousMetricLog;
class ISystemLog
@@ -99,6 +100,8 @@ struct SystemLogs
std::shared_ptr trace_log; /// Used to log traces from query profiler
std::shared_ptr text_log; /// Used to log all text messages.
std::shared_ptr metric_log; /// Used to log all metrics.
+ /// Metrics from system.asynchronous_metrics.
+ std::shared_ptr asynchronous_metric_log;
std::vector logs;
};
diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make
index 178c3ee3125..29be5d3c216 100644
--- a/src/Interpreters/ya.make
+++ b/src/Interpreters/ya.make
@@ -105,6 +105,7 @@ SRCS(
MarkTableIdentifiersVisitor.cpp
MergeJoin.cpp
MetricLog.cpp
+ AsynchronousMetricLog.cpp
MutationsInterpreter.cpp
NullableUtils.cpp
OptimizeIfChains.cpp