diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index b621a897035..d788270ecf9 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -736,7 +736,9 @@ int Server::main(const std::vector & /*args*/) std::vector servers_to_start_before_tables; /// This object will periodically calculate some metrics. AsynchronousMetrics async_metrics( - global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), + global_context, + config().getUInt("asynchronous_metrics_update_period_s", 1), + config().getUInt("asynchronous_heavy_metrics_update_period_s", 120), [&]() -> std::vector { std::vector metrics; diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index f9bc22dd110..59f97384626 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -77,9 +77,11 @@ static std::unique_ptr openFileIfExists(const std::stri AsynchronousMetrics::AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, + int heavy_metrics_update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_) : WithContext(global_context_) , update_period(update_period_seconds) + , heavy_metric_update_period(heavy_metrics_update_period_seconds) , protocol_server_metrics_func(protocol_server_metrics_func_) , log(&Poco::Logger::get("AsynchronousMetrics")) { @@ -1584,6 +1586,8 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti saveAllArenasMetric(new_values, "muzzy_purged"); #endif + update_heavy_metrics(current_time, update_time, new_values); + /// Add more metrics as you wish. new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds(); @@ -1601,4 +1605,76 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti values = new_values; } +void AsynchronousMetrics::update_detached_parts_stats() +{ + DetachedPartsStats current_values{}; + + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + if (!db.second->canContainMergeTreeTables()) + continue; + + for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next()) + { + const auto & table = iterator->table(); + if (!table) + continue; + + if (MergeTreeData * table_merge_tree = dynamic_cast(table.get())) + { + for (const auto & detached_part: table_merge_tree->getDetachedParts()) + { + if (!detached_part.valid_name) + continue; + + if (detached_part.prefix.empty()) + ++current_values.detached_by_user; + + ++current_values.count; + } + } + } + } + + detached_parts_stats = current_values; +} + +void AsynchronousMetrics::update_heavy_metrics(std::chrono::system_clock::time_point current_time, std::chrono::system_clock::time_point update_time, AsynchronousMetricValues & new_values) +{ + const auto time_after_previous_update = current_time - heavy_metric_previous_update_time; + const bool update_heavy_metric = time_after_previous_update >= heavy_metric_update_period || first_run; + + if (update_heavy_metric) + { + heavy_metric_previous_update_time = update_time; + + Stopwatch watch; + + // test shows that listing 100000 entries consuming around 0.15 sec. + update_detached_parts_stats(); + + watch.stop(); + + // normally heavy metrics don't delay the rest of the metrics calculation + // otherwise log the warning message + auto log_level = std::make_pair(DB::LogsLevel::trace, Poco::Message::PRIO_TRACE); + if (watch.elapsedSeconds() > (update_period.count() / 2.)) + log_level = std::make_pair(DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG); + else if (watch.elapsedSeconds() > (update_period.count() / 4. * 3)) + log_level = std::make_pair(DB::LogsLevel::warning, Poco::Message::PRIO_WARNING); + LOG_IMPL(log, log_level.first, log_level.second, + "Update heavy metrics. " + "Update period {} sec. " + "Update heavy metrics period {} sec. " + "Heavy metrics calculation elapsed: {} sec.", + update_period.count(), + heavy_metric_update_period.count(), + watch.elapsedSeconds()); + + } + + new_values["NumberOfDetachedParts"] = detached_parts_stats.count; + new_values["NumberOfDetachedPartsByUser"] = detached_parts_stats.detached_by_user; +} + } diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index e4bcb2890f3..813dd7ec608 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -50,6 +50,7 @@ public: AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, + int heavy_metrics_update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_); ~AsynchronousMetrics(); @@ -64,6 +65,7 @@ public: private: const std::chrono::seconds update_period; + const std::chrono::seconds heavy_metric_update_period; ProtocolServerMetricsFunc protocol_server_metrics_func; mutable std::mutex mutex; @@ -75,6 +77,15 @@ private: /// On first run we will only collect the values to subtract later. bool first_run = true; std::chrono::system_clock::time_point previous_update_time; + std::chrono::system_clock::time_point heavy_metric_previous_update_time; + + struct DetachedPartsStats + { + size_t count; + size_t detached_by_user; + }; + + DetachedPartsStats detached_parts_stats{}; #if defined(OS_LINUX) || defined(OS_FREEBSD) MemoryStatisticsOS memory_stat; @@ -187,6 +198,9 @@ private: void run(); void update(std::chrono::system_clock::time_point update_time); + void update_detached_parts_stats(); + void update_heavy_metrics(std::chrono::system_clock::time_point current_time, std::chrono::system_clock::time_point update_time, AsynchronousMetricValues & new_values); + Poco::Logger * log; }; diff --git a/tests/integration/test_detached_parts_metrics/__init__.py b/tests/integration/test_detached_parts_metrics/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_detached_parts_metrics/configs/asynchronous_metrics_update_period_s.xml b/tests/integration/test_detached_parts_metrics/configs/asynchronous_metrics_update_period_s.xml new file mode 100644 index 00000000000..0a56d734805 --- /dev/null +++ b/tests/integration/test_detached_parts_metrics/configs/asynchronous_metrics_update_period_s.xml @@ -0,0 +1,4 @@ + + 1 + 1 + diff --git a/tests/integration/test_detached_parts_metrics/test.py b/tests/integration/test_detached_parts_metrics/test.py new file mode 100644 index 00000000000..1e3172787be --- /dev/null +++ b/tests/integration/test_detached_parts_metrics/test.py @@ -0,0 +1,156 @@ +import time +import pytest +from helpers.cluster import ClickHouseCluster + + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance( + "node1", + main_configs=["configs/asynchronous_metrics_update_period_s.xml"], +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def wait_until(call_back, time_to_sleep=0.5, timeout=60): + assert callable(call_back) + start_time = time.time() + deadline = time.time() + timeout + while not call_back() and time.time() < deadline: + time.sleep(time_to_sleep) + assert call_back(), "Elapsed {}".format(time.time() - start_time) + + +def is_different(a, b): + def wrap(): + res_a = a() if callable(a) else a + res_b = b() if callable(b) else b + return res_a != res_b + + return wrap + + +def test_event_time_microseconds_field(started_cluster): + cluster.start() + query_create = """ + CREATE TABLE t + ( + id Int64, + event_time Date + ) + Engine=MergeTree() + PARTITION BY toYYYYMMDD(event_time) + ORDER BY id; + """ + node1.query(query_create) + + # gives us 2 partitions with 3 parts in total + node1.query("INSERT INTO t VALUES (1, toDate('2022-09-01'));") + node1.query("INSERT INTO t VALUES (2, toDate('2022-08-29'));") + node1.query("INSERT INTO t VALUES (3, toDate('2022-09-01'));") + + query_number_detached_parts_in_async_metric = """ + SELECT value + FROM system.asynchronous_metrics + WHERE metric LIKE 'NumberOfDetachedParts'; + """ + query_number_detached_by_user_parts_in_async_metric = """ + SELECT value + FROM system.asynchronous_metrics + WHERE metric LIKE 'NumberOfDetachedPartsByUser'; + """ + query_count_active_parts = """ + SELECT count(*) FROM system.parts WHERE table = 't' AND active + """ + query_count_detached_parts = """ + SELECT count(*) FROM system.detached_parts WHERE table = 't' + """ + + query_one_partition_name = """ + SELECT name FROM system.parts WHERE table = 't' AND active AND partition = '20220829' + """ + partition_name = node1.query(query_one_partition_name).strip() + + assert 0 == int(node1.query(query_count_detached_parts)) + assert 3 == int(node1.query(query_count_active_parts)) + assert 0 == int(node1.query(query_number_detached_parts_in_async_metric)) + assert 0 == int(node1.query(query_number_detached_by_user_parts_in_async_metric)) + + # detach some parts and wait until asynchronous metrics notice it + node1.query("ALTER TABLE t DETACH PARTITION '20220901';") + + assert 2 == int(node1.query(query_count_detached_parts)) + assert 1 == int(node1.query(query_count_active_parts)) + + wait_until( + is_different( + 0, lambda: int(node1.query(query_number_detached_parts_in_async_metric)) + ) + ) + assert 2 == int(node1.query(query_number_detached_parts_in_async_metric)) + assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric)) + + # detach the rest parts and wait until asynchronous metrics notice it + node1.query("ALTER TABLE t DETACH PARTITION ALL") + + assert 3 == int(node1.query(query_count_detached_parts)) + assert 0 == int(node1.query(query_count_active_parts)) + + wait_until( + is_different( + 2, lambda: int(node1.query(query_number_detached_parts_in_async_metric)) + ) + ) + assert 3 == int(node1.query(query_number_detached_parts_in_async_metric)) + assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric)) + + # inject some data directly and wait until asynchronous metrics notice it + node1.exec_in_container( + [ + "bash", + "-c", + "mkdir /var/lib/clickhouse/data/default/t/detached/unexpected_all_0_0_0", + ], + privileged=True, + ) + + assert 4 == int(node1.query(query_count_detached_parts)) + assert 0 == int(node1.query(query_count_active_parts)) + + wait_until( + is_different( + 3, lambda: int(node1.query(query_number_detached_parts_in_async_metric)) + ) + ) + assert 4 == int(node1.query(query_number_detached_parts_in_async_metric)) + assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric)) + + # drop some data directly and wait asynchronous metrics notice it + node1.exec_in_container( + [ + "bash", + "-c", + "rm -rf /var/lib/clickhouse/data/default/t/detached/{}".format( + partition_name + ), + ], + privileged=True, + ) + + assert 3 == int(node1.query(query_count_detached_parts)) + assert 0 == int(node1.query(query_count_active_parts)) + + wait_until( + is_different( + 4, lambda: int(node1.query(query_number_detached_parts_in_async_metric)) + ) + ) + assert 3 == int(node1.query(query_number_detached_parts_in_async_metric)) + assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))