mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #40779 from CheSema/detached-parts-metric
Metric for the number of detached parts
This commit is contained in:
commit
7b59fdc042
@ -736,7 +736,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
|||||||
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
|
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
|
||||||
/// This object will periodically calculate some metrics.
|
/// This object will periodically calculate some metrics.
|
||||||
AsynchronousMetrics async_metrics(
|
AsynchronousMetrics async_metrics(
|
||||||
global_context, config().getUInt("asynchronous_metrics_update_period_s", 1),
|
global_context,
|
||||||
|
config().getUInt("asynchronous_metrics_update_period_s", 1),
|
||||||
|
config().getUInt("asynchronous_heavy_metrics_update_period_s", 120),
|
||||||
[&]() -> std::vector<ProtocolServerMetrics>
|
[&]() -> std::vector<ProtocolServerMetrics>
|
||||||
{
|
{
|
||||||
std::vector<ProtocolServerMetrics> metrics;
|
std::vector<ProtocolServerMetrics> metrics;
|
||||||
|
@ -77,9 +77,11 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
|
|||||||
AsynchronousMetrics::AsynchronousMetrics(
|
AsynchronousMetrics::AsynchronousMetrics(
|
||||||
ContextPtr global_context_,
|
ContextPtr global_context_,
|
||||||
int update_period_seconds,
|
int update_period_seconds,
|
||||||
|
int heavy_metrics_update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
: WithContext(global_context_)
|
: WithContext(global_context_)
|
||||||
, update_period(update_period_seconds)
|
, update_period(update_period_seconds)
|
||||||
|
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
||||||
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
||||||
, log(&Poco::Logger::get("AsynchronousMetrics"))
|
, log(&Poco::Logger::get("AsynchronousMetrics"))
|
||||||
{
|
{
|
||||||
@ -563,7 +565,7 @@ AsynchronousMetrics::NetworkInterfaceStatValues::operator-(const AsynchronousMet
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_time)
|
void AsynchronousMetrics::update(TimePoint update_time)
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
|
|
||||||
@ -1584,6 +1586,8 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
|||||||
saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
|
saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
updateHeavyMetricsIfNeeded(current_time, update_time, new_values);
|
||||||
|
|
||||||
/// Add more metrics as you wish.
|
/// Add more metrics as you wish.
|
||||||
|
|
||||||
new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds();
|
new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds();
|
||||||
@ -1601,4 +1605,76 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
|||||||
values = new_values;
|
values = new_values;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AsynchronousMetrics::updateDetachedPartsStats()
|
||||||
|
{
|
||||||
|
DetachedPartsStats current_values{};
|
||||||
|
|
||||||
|
for (const auto & db : DatabaseCatalog::instance().getDatabases())
|
||||||
|
{
|
||||||
|
if (!db.second->canContainMergeTreeTables())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
|
||||||
|
{
|
||||||
|
const auto & table = iterator->table();
|
||||||
|
if (!table)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (MergeTreeData * table_merge_tree = dynamic_cast<MergeTreeData *>(table.get()))
|
||||||
|
{
|
||||||
|
for (const auto & detached_part: table_merge_tree->getDetachedParts())
|
||||||
|
{
|
||||||
|
if (!detached_part.valid_name)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (detached_part.prefix.empty())
|
||||||
|
++current_values.detached_by_user;
|
||||||
|
|
||||||
|
++current_values.count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
detached_parts_stats = current_values;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsynchronousMetrics::updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, AsynchronousMetricValues & new_values)
|
||||||
|
{
|
||||||
|
const auto time_after_previous_update = current_time - heavy_metric_previous_update_time;
|
||||||
|
const bool update_heavy_metric = time_after_previous_update >= heavy_metric_update_period || first_run;
|
||||||
|
|
||||||
|
if (update_heavy_metric)
|
||||||
|
{
|
||||||
|
heavy_metric_previous_update_time = update_time;
|
||||||
|
|
||||||
|
Stopwatch watch;
|
||||||
|
|
||||||
|
/// Test shows that listing 100000 entries consuming around 0.15 sec.
|
||||||
|
updateDetachedPartsStats();
|
||||||
|
|
||||||
|
watch.stop();
|
||||||
|
|
||||||
|
/// Normally heavy metrics don't delay the rest of the metrics calculation
|
||||||
|
/// otherwise log the warning message
|
||||||
|
auto log_level = std::make_pair(DB::LogsLevel::trace, Poco::Message::PRIO_TRACE);
|
||||||
|
if (watch.elapsedSeconds() > (update_period.count() / 2.))
|
||||||
|
log_level = std::make_pair(DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG);
|
||||||
|
else if (watch.elapsedSeconds() > (update_period.count() / 4. * 3))
|
||||||
|
log_level = std::make_pair(DB::LogsLevel::warning, Poco::Message::PRIO_WARNING);
|
||||||
|
LOG_IMPL(log, log_level.first, log_level.second,
|
||||||
|
"Update heavy metrics. "
|
||||||
|
"Update period {} sec. "
|
||||||
|
"Update heavy metrics period {} sec. "
|
||||||
|
"Heavy metrics calculation elapsed: {} sec.",
|
||||||
|
update_period.count(),
|
||||||
|
heavy_metric_update_period.count(),
|
||||||
|
watch.elapsedSeconds());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
new_values["NumberOfDetachedParts"] = detached_parts_stats.count;
|
||||||
|
new_values["NumberOfDetachedByUserParts"] = detached_parts_stats.detached_by_user;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,7 @@ public:
|
|||||||
AsynchronousMetrics(
|
AsynchronousMetrics(
|
||||||
ContextPtr global_context_,
|
ContextPtr global_context_,
|
||||||
int update_period_seconds,
|
int update_period_seconds,
|
||||||
|
int heavy_metrics_update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
|
||||||
|
|
||||||
~AsynchronousMetrics();
|
~AsynchronousMetrics();
|
||||||
@ -63,7 +64,11 @@ public:
|
|||||||
AsynchronousMetricValues getValues() const;
|
AsynchronousMetricValues getValues() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::chrono::seconds update_period;
|
using Duration = std::chrono::seconds;
|
||||||
|
using TimePoint = std::chrono::system_clock::time_point;
|
||||||
|
|
||||||
|
const Duration update_period;
|
||||||
|
const Duration heavy_metric_update_period;
|
||||||
ProtocolServerMetricsFunc protocol_server_metrics_func;
|
ProtocolServerMetricsFunc protocol_server_metrics_func;
|
||||||
|
|
||||||
mutable std::mutex mutex;
|
mutable std::mutex mutex;
|
||||||
@ -74,7 +79,16 @@ private:
|
|||||||
/// Some values are incremental and we have to calculate the difference.
|
/// Some values are incremental and we have to calculate the difference.
|
||||||
/// On first run we will only collect the values to subtract later.
|
/// On first run we will only collect the values to subtract later.
|
||||||
bool first_run = true;
|
bool first_run = true;
|
||||||
std::chrono::system_clock::time_point previous_update_time;
|
TimePoint previous_update_time;
|
||||||
|
TimePoint heavy_metric_previous_update_time;
|
||||||
|
|
||||||
|
struct DetachedPartsStats
|
||||||
|
{
|
||||||
|
size_t count;
|
||||||
|
size_t detached_by_user;
|
||||||
|
};
|
||||||
|
|
||||||
|
DetachedPartsStats detached_parts_stats{};
|
||||||
|
|
||||||
#if defined(OS_LINUX) || defined(OS_FREEBSD)
|
#if defined(OS_LINUX) || defined(OS_FREEBSD)
|
||||||
MemoryStatisticsOS memory_stat;
|
MemoryStatisticsOS memory_stat;
|
||||||
@ -185,7 +199,10 @@ private:
|
|||||||
std::unique_ptr<ThreadFromGlobalPool> thread;
|
std::unique_ptr<ThreadFromGlobalPool> thread;
|
||||||
|
|
||||||
void run();
|
void run();
|
||||||
void update(std::chrono::system_clock::time_point update_time);
|
void update(TimePoint update_time);
|
||||||
|
|
||||||
|
void updateDetachedPartsStats();
|
||||||
|
void updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, AsynchronousMetricValues & new_values);
|
||||||
|
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
@ -0,0 +1,4 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<asynchronous_metrics_update_period_s>1</asynchronous_metrics_update_period_s>
|
||||||
|
<asynchronous_heavy_metrics_update_period_s>1</asynchronous_heavy_metrics_update_period_s>
|
||||||
|
</clickhouse>
|
133
tests/integration/test_detached_parts_metrics/test.py
Normal file
133
tests/integration/test_detached_parts_metrics/test.py
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
from helpers.test_tools import assert_eq_with_retry
|
||||||
|
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
node1 = cluster.add_instance(
|
||||||
|
"node1",
|
||||||
|
main_configs=["configs/asynchronous_metrics_update_period_s.xml"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def started_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_event_time_microseconds_field(started_cluster):
|
||||||
|
cluster.start()
|
||||||
|
query_create = """
|
||||||
|
CREATE TABLE t
|
||||||
|
(
|
||||||
|
id Int64,
|
||||||
|
event_time Date
|
||||||
|
)
|
||||||
|
Engine=MergeTree()
|
||||||
|
PARTITION BY toYYYYMMDD(event_time)
|
||||||
|
ORDER BY id;
|
||||||
|
"""
|
||||||
|
node1.query(query_create)
|
||||||
|
|
||||||
|
# gives us 2 partitions with 3 parts in total
|
||||||
|
node1.query("INSERT INTO t VALUES (1, toDate('2022-09-01'));")
|
||||||
|
node1.query("INSERT INTO t VALUES (2, toDate('2022-08-29'));")
|
||||||
|
node1.query("INSERT INTO t VALUES (3, toDate('2022-09-01'));")
|
||||||
|
|
||||||
|
query_number_detached_parts_in_async_metric = """
|
||||||
|
SELECT value
|
||||||
|
FROM system.asynchronous_metrics
|
||||||
|
WHERE metric LIKE 'NumberOfDetachedParts';
|
||||||
|
"""
|
||||||
|
query_number_detached_by_user_parts_in_async_metric = """
|
||||||
|
SELECT value
|
||||||
|
FROM system.asynchronous_metrics
|
||||||
|
WHERE metric LIKE 'NumberOfDetachedByUserParts';
|
||||||
|
"""
|
||||||
|
query_count_active_parts = """
|
||||||
|
SELECT count(*) FROM system.parts WHERE table = 't' AND active
|
||||||
|
"""
|
||||||
|
query_count_detached_parts = """
|
||||||
|
SELECT count(*) FROM system.detached_parts WHERE table = 't'
|
||||||
|
"""
|
||||||
|
|
||||||
|
query_one_partition_name = """
|
||||||
|
SELECT name FROM system.parts WHERE table = 't' AND active AND partition = '20220829'
|
||||||
|
"""
|
||||||
|
partition_name = node1.query(query_one_partition_name).strip()
|
||||||
|
|
||||||
|
assert 0 == int(node1.query(query_count_detached_parts))
|
||||||
|
assert 3 == int(node1.query(query_count_active_parts))
|
||||||
|
assert 0 == int(node1.query(query_number_detached_parts_in_async_metric))
|
||||||
|
assert 0 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||||
|
|
||||||
|
# detach some parts and wait until asynchronous metrics notice it
|
||||||
|
node1.query("ALTER TABLE t DETACH PARTITION '20220901';")
|
||||||
|
|
||||||
|
assert 2 == int(node1.query(query_count_detached_parts))
|
||||||
|
assert 1 == int(node1.query(query_count_active_parts))
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
node1,
|
||||||
|
query_number_detached_parts_in_async_metric,
|
||||||
|
"2\n",
|
||||||
|
)
|
||||||
|
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||||
|
|
||||||
|
# detach the rest parts and wait until asynchronous metrics notice it
|
||||||
|
node1.query("ALTER TABLE t DETACH PARTITION ALL")
|
||||||
|
|
||||||
|
assert 3 == int(node1.query(query_count_detached_parts))
|
||||||
|
assert 0 == int(node1.query(query_count_active_parts))
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
node1,
|
||||||
|
query_number_detached_parts_in_async_metric,
|
||||||
|
"3\n",
|
||||||
|
)
|
||||||
|
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||||
|
|
||||||
|
# inject some data directly and wait until asynchronous metrics notice it
|
||||||
|
node1.exec_in_container(
|
||||||
|
[
|
||||||
|
"bash",
|
||||||
|
"-c",
|
||||||
|
"mkdir /var/lib/clickhouse/data/default/t/detached/unexpected_all_0_0_0",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert 4 == int(node1.query(query_count_detached_parts))
|
||||||
|
assert 0 == int(node1.query(query_count_active_parts))
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
node1,
|
||||||
|
query_number_detached_parts_in_async_metric,
|
||||||
|
"4\n",
|
||||||
|
)
|
||||||
|
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||||
|
|
||||||
|
# drop some data directly and wait asynchronous metrics notice it
|
||||||
|
node1.exec_in_container(
|
||||||
|
[
|
||||||
|
"bash",
|
||||||
|
"-c",
|
||||||
|
"rm -rf /var/lib/clickhouse/data/default/t/detached/{}".format(
|
||||||
|
partition_name
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert 3 == int(node1.query(query_count_detached_parts))
|
||||||
|
assert 0 == int(node1.query(query_count_active_parts))
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
node1,
|
||||||
|
query_number_detached_parts_in_async_metric,
|
||||||
|
"3\n",
|
||||||
|
)
|
||||||
|
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
Loading…
Reference in New Issue
Block a user