Metric for the number of detached parts

This commit is contained in:
Sema Checherinda 2022-08-29 22:00:26 +02:00
parent 3ee52eb25f
commit e436b4f4cc
6 changed files with 253 additions and 1 deletions

View File

@ -736,7 +736,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(
global_context, config().getUInt("asynchronous_metrics_update_period_s", 1),
global_context,
config().getUInt("asynchronous_metrics_update_period_s", 1),
config().getUInt("asynchronous_heavy_metrics_update_period_s", 120),
[&]() -> std::vector<ProtocolServerMetrics>
{
std::vector<ProtocolServerMetrics> metrics;

View File

@ -77,9 +77,11 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
AsynchronousMetrics::AsynchronousMetrics(
ContextPtr global_context_,
int update_period_seconds,
int heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
: WithContext(global_context_)
, update_period(update_period_seconds)
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
, protocol_server_metrics_func(protocol_server_metrics_func_)
, log(&Poco::Logger::get("AsynchronousMetrics"))
{
@ -1584,6 +1586,8 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
#endif
update_heavy_metrics(current_time, update_time, new_values);
/// Add more metrics as you wish.
new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds();
@ -1601,4 +1605,76 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
values = new_values;
}
void AsynchronousMetrics::update_detached_parts_stats()
{
DetachedPartsStats current_values{};
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)
continue;
if (MergeTreeData * table_merge_tree = dynamic_cast<MergeTreeData *>(table.get()))
{
for (const auto & detached_part: table_merge_tree->getDetachedParts())
{
if (!detached_part.valid_name)
continue;
if (detached_part.prefix.empty())
++current_values.detached_by_user;
++current_values.count;
}
}
}
}
detached_parts_stats = current_values;
}
void AsynchronousMetrics::update_heavy_metrics(std::chrono::system_clock::time_point current_time, std::chrono::system_clock::time_point update_time, AsynchronousMetricValues & new_values)
{
const auto time_after_previous_update = current_time - heavy_metric_previous_update_time;
const bool update_heavy_metric = time_after_previous_update >= heavy_metric_update_period || first_run;
if (update_heavy_metric)
{
heavy_metric_previous_update_time = update_time;
Stopwatch watch;
// test shows that listing 100000 entries consuming around 0.15 sec.
update_detached_parts_stats();
watch.stop();
// normally heavy metrics don't delay the rest of the metrics calculation
// otherwise log the warning message
auto log_level = std::make_pair(DB::LogsLevel::trace, Poco::Message::PRIO_TRACE);
if (watch.elapsedSeconds() > (update_period.count() / 2.))
log_level = std::make_pair(DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG);
else if (watch.elapsedSeconds() > (update_period.count() / 4. * 3))
log_level = std::make_pair(DB::LogsLevel::warning, Poco::Message::PRIO_WARNING);
LOG_IMPL(log, log_level.first, log_level.second,
"Update heavy metrics. "
"Update period {} sec. "
"Update heavy metrics period {} sec. "
"Heavy metrics calculation elapsed: {} sec.",
update_period.count(),
heavy_metric_update_period.count(),
watch.elapsedSeconds());
}
new_values["NumberOfDetachedParts"] = detached_parts_stats.count;
new_values["NumberOfDetachedPartsByUser"] = detached_parts_stats.detached_by_user;
}
}

View File

@ -50,6 +50,7 @@ public:
AsynchronousMetrics(
ContextPtr global_context_,
int update_period_seconds,
int heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
~AsynchronousMetrics();
@ -64,6 +65,7 @@ public:
private:
const std::chrono::seconds update_period;
const std::chrono::seconds heavy_metric_update_period;
ProtocolServerMetricsFunc protocol_server_metrics_func;
mutable std::mutex mutex;
@ -75,6 +77,15 @@ private:
/// On first run we will only collect the values to subtract later.
bool first_run = true;
std::chrono::system_clock::time_point previous_update_time;
std::chrono::system_clock::time_point heavy_metric_previous_update_time;
struct DetachedPartsStats
{
size_t count;
size_t detached_by_user;
};
DetachedPartsStats detached_parts_stats{};
#if defined(OS_LINUX) || defined(OS_FREEBSD)
MemoryStatisticsOS memory_stat;
@ -187,6 +198,9 @@ private:
void run();
void update(std::chrono::system_clock::time_point update_time);
void update_detached_parts_stats();
void update_heavy_metrics(std::chrono::system_clock::time_point current_time, std::chrono::system_clock::time_point update_time, AsynchronousMetricValues & new_values);
Poco::Logger * log;
};

View File

@ -0,0 +1,4 @@
<clickhouse>
<asynchronous_metrics_update_period_s>1</asynchronous_metrics_update_period_s>
<asynchronous_heavy_metrics_update_period_s>1</asynchronous_heavy_metrics_update_period_s>
</clickhouse>

View File

@ -0,0 +1,156 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/asynchronous_metrics_update_period_s.xml"],
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def wait_until(call_back, time_to_sleep=0.5, timeout=60):
assert callable(call_back)
start_time = time.time()
deadline = time.time() + timeout
while not call_back() and time.time() < deadline:
time.sleep(time_to_sleep)
assert call_back(), "Elapsed {}".format(time.time() - start_time)
def is_different(a, b):
def wrap():
res_a = a() if callable(a) else a
res_b = b() if callable(b) else b
return res_a != res_b
return wrap
def test_event_time_microseconds_field(started_cluster):
cluster.start()
query_create = """
CREATE TABLE t
(
id Int64,
event_time Date
)
Engine=MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;
"""
node1.query(query_create)
# gives us 2 partitions with 3 parts in total
node1.query("INSERT INTO t VALUES (1, toDate('2022-09-01'));")
node1.query("INSERT INTO t VALUES (2, toDate('2022-08-29'));")
node1.query("INSERT INTO t VALUES (3, toDate('2022-09-01'));")
query_number_detached_parts_in_async_metric = """
SELECT value
FROM system.asynchronous_metrics
WHERE metric LIKE 'NumberOfDetachedParts';
"""
query_number_detached_by_user_parts_in_async_metric = """
SELECT value
FROM system.asynchronous_metrics
WHERE metric LIKE 'NumberOfDetachedPartsByUser';
"""
query_count_active_parts = """
SELECT count(*) FROM system.parts WHERE table = 't' AND active
"""
query_count_detached_parts = """
SELECT count(*) FROM system.detached_parts WHERE table = 't'
"""
query_one_partition_name = """
SELECT name FROM system.parts WHERE table = 't' AND active AND partition = '20220829'
"""
partition_name = node1.query(query_one_partition_name).strip()
assert 0 == int(node1.query(query_count_detached_parts))
assert 3 == int(node1.query(query_count_active_parts))
assert 0 == int(node1.query(query_number_detached_parts_in_async_metric))
assert 0 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
# detach some parts and wait until asynchronous metrics notice it
node1.query("ALTER TABLE t DETACH PARTITION '20220901';")
assert 2 == int(node1.query(query_count_detached_parts))
assert 1 == int(node1.query(query_count_active_parts))
wait_until(
is_different(
0, lambda: int(node1.query(query_number_detached_parts_in_async_metric))
)
)
assert 2 == int(node1.query(query_number_detached_parts_in_async_metric))
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
# detach the rest parts and wait until asynchronous metrics notice it
node1.query("ALTER TABLE t DETACH PARTITION ALL")
assert 3 == int(node1.query(query_count_detached_parts))
assert 0 == int(node1.query(query_count_active_parts))
wait_until(
is_different(
2, lambda: int(node1.query(query_number_detached_parts_in_async_metric))
)
)
assert 3 == int(node1.query(query_number_detached_parts_in_async_metric))
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
# inject some data directly and wait until asynchronous metrics notice it
node1.exec_in_container(
[
"bash",
"-c",
"mkdir /var/lib/clickhouse/data/default/t/detached/unexpected_all_0_0_0",
],
privileged=True,
)
assert 4 == int(node1.query(query_count_detached_parts))
assert 0 == int(node1.query(query_count_active_parts))
wait_until(
is_different(
3, lambda: int(node1.query(query_number_detached_parts_in_async_metric))
)
)
assert 4 == int(node1.query(query_number_detached_parts_in_async_metric))
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
# drop some data directly and wait asynchronous metrics notice it
node1.exec_in_container(
[
"bash",
"-c",
"rm -rf /var/lib/clickhouse/data/default/t/detached/{}".format(
partition_name
),
],
privileged=True,
)
assert 3 == int(node1.query(query_count_detached_parts))
assert 0 == int(node1.query(query_count_active_parts))
wait_until(
is_different(
4, lambda: int(node1.query(query_number_detached_parts_in_async_metric))
)
)
assert 3 == int(node1.query(query_number_detached_parts_in_async_metric))
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))