add replication lag and recovery time metrics

This commit is contained in:
Michael Stetsyuk 2024-07-18 12:32:16 +02:00
parent 471de5e481
commit 7fc8ee726e
9 changed files with 129 additions and 35 deletions

View File

@ -338,42 +338,40 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
}
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
{
Strings paths;
ReplicasInfo res;
auto zookeeper = getZooKeeper();
const auto & addresses_with_failover = cluster_->getShardsAddresses();
const auto & shards_info = cluster_->getShardsInfo();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
}
}
try
{
auto current_zookeeper = getZooKeeper();
auto res = current_zookeeper->exists(paths);
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(zookeeper_path + "/max_log_ptr"));
std::vector<UInt8> statuses;
statuses.resize(paths.size());
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZOK)
statuses[i] = 1;
return statuses;
}
catch (...)
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
UInt32 log_ptr = parse<UInt32>(zookeeper->get(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr"));
bool is_active = zookeeper->exists(fs::path(zookeeper_path) / "replicas" / full_name / "active");
res.push_back(ReplicaInfo{
.is_active = is_active,
.replication_lag = max_log_ptr - log_ptr,
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
});
}
}
return res;
} catch (...)
{
tryLogCurrentException(log);
return {};
}
}
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
{
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);

View File

@ -17,6 +17,14 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct ReplicaInfo
{
bool is_active;
UInt32 replication_lag;
UInt64 recovery_time;
};
using ReplicasInfo = std::vector<ReplicaInfo>;
class DatabaseReplicated : public DatabaseAtomic
{
public:
@ -84,7 +92,7 @@ public:
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
void renameDatabase(ContextPtr query_context, const String & new_name) override;

View File

@ -32,6 +32,8 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
initialization_duration_timer.emplace();
while (!stop_flag)
{
try
@ -69,6 +71,7 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
initializeReplication();
initialized = true;
initialization_duration_timer.reset();
return true;
}
catch (...)
@ -78,6 +81,7 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
}
}
initialization_duration_timer.reset();
return false;
}

View File

@ -36,6 +36,8 @@ public:
DatabaseReplicated * const database, bool committed = false); /// NOLINT
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const { return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0; }
private:
bool initializeMainThread() override;
void initializeReplication();
@ -56,6 +58,8 @@ private:
ZooKeeperPtr active_node_holder_zookeeper;
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
std::optional<Stopwatch> initialization_duration_timer;
};
}

View File

@ -31,6 +31,8 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
{"replication_lag", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt32>()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."},
{"recovery_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."},
};
description.setAliases({
@ -46,24 +48,23 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
writeCluster(res_columns, name_and_cluster, {});
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & name_and_database : databases)
for (const auto & [database_name, database] : databases)
{
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(name_and_database.second.get()))
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(database.get()))
{
if (auto database_cluster = replicated->tryGetCluster())
writeCluster(res_columns, {name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
writeCluster(res_columns, {database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster},
replicated->tryGetReplicasInfo(database_cluster));
}
}
}
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
const std::vector<UInt8> & is_active)
const ReplicasInfo & replicas_info)
{
const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second;
@ -99,10 +100,22 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
res_columns[i++]->insert(address.database_shard_name);
res_columns[i++]->insert(address.database_replica_name);
if (is_active.empty())
if (replicas_info.empty())
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
else
res_columns[i++]->insert(is_active[replica_idx++]);
{
const auto & replica_info = replicas_info[replica_idx++];
res_columns[i++]->insert(replica_info.is_active);
res_columns[i++]->insert(replica_info.replication_lag);
if (replica_info.recovery_time != 0)
res_columns[i++]->insert(replica_info.recovery_time);
else
res_columns[i++]->insertDefault();
}
}
}
}

View File

@ -27,7 +27,7 @@ protected:
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info);
};
}

View File

@ -0,0 +1,41 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<profile>default</profile>
<no_password></no_password>
</default>
</users>
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<session_timeout_ms>20000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>9444</port>
</server>
</raft_configuration>
</keeper_server>
<zookeeper>
<node index="1">
<host>localhost</host>
<port>2181</port>
</node>
<session_timeout_ms>20000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,26 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/config.xml"], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_recovery_time_metric(start_cluster):
node.query("CREATE DATABASE rdb ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1');")
node.query("CREATE TABLE rdb.t (x UInt32) ENGINE = MergeTree ORDER BY x;")
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
node.restart_clickhouse()
assert (
node.query("SELECT any(recovery_time) FROM system.clusters;")
!= "0\n"
)