diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index f127ccbc224..d2dee9b5994 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -338,9 +339,12 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const return std::make_shared(getContext()->getSettingsRef(), shards, params); } -std::vector DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const +ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const { - Strings paths; + Strings paths_get, paths_exists; + + paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr"); + const auto & addresses_with_failover = cluster_->getShardsAddresses(); const auto & shards_info = cluster_->getShardsInfo(); for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) @@ -348,32 +352,59 @@ std::vector DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr for (const auto & replica : addresses_with_failover[shard_index]) { String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name); - paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active"); + paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active"); + paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr"); } } try { auto current_zookeeper = getZooKeeper(); - auto res = current_zookeeper->exists(paths); + auto get_res = current_zookeeper->get(paths_get); + auto exist_res = current_zookeeper->exists(paths_exists); + chassert(get_res.size() == exist_res.size() + 1); - std::vector statuses; - statuses.resize(paths.size()); + auto max_log_ptr_zk = get_res[0]; + if (max_log_ptr_zk.error != Coordination::Error::ZOK) + throw Coordination::Exception(max_log_ptr_zk.error); - for (size_t i = 0; i < res.size(); ++i) - if (res[i].error == Coordination::Error::ZOK) - statuses[i] = 1; + UInt32 max_log_ptr = parse(max_log_ptr_zk.data); - return statuses; - } - catch (...) + ReplicasInfo replicas_info; + replicas_info.resize(exist_res.size()); + + size_t global_replica_index = 0; + for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) + { + for (const auto & replica : addresses_with_failover[shard_index]) + { + auto replica_active = exist_res[global_replica_index]; + auto replica_log_ptr = get_res[global_replica_index + 1]; + + if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE) + throw Coordination::Exception(replica_active.error); + + if (replica_log_ptr.error != Coordination::Error::ZOK) + throw Coordination::Exception(replica_log_ptr.error); + + replicas_info[global_replica_index] = ReplicaInfo{ + .is_active = replica_active.error == Coordination::Error::ZOK, + .replication_lag = max_log_ptr - parse(replica_log_ptr.data), + .recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0, + }; + + ++global_replica_index; + } + } + + return replicas_info; + } catch (...) { tryLogCurrentException(log); return {}; } } - void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref) { const auto & config_prefix = fmt::format("named_collections.{}", collection_name); diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index 27ab262d1f1..5a1570ae2e2 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -17,6 +17,14 @@ using ZooKeeperPtr = std::shared_ptr; class Cluster; using ClusterPtr = std::shared_ptr; +struct ReplicaInfo +{ + bool is_active; + UInt32 replication_lag; + UInt64 recovery_time; +}; +using ReplicasInfo = std::vector; + class DatabaseReplicated : public DatabaseAtomic { public: @@ -84,7 +92,7 @@ public: static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop); - std::vector tryGetAreReplicasActive(const ClusterPtr & cluster_) const; + ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const; void renameDatabase(ContextPtr query_context, const String & new_name) override; diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 1ef88dc03bc..4e7408aa96e 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -32,6 +32,12 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db bool DatabaseReplicatedDDLWorker::initializeMainThread() { + { + std::lock_guard lock(initialization_duration_timer_mutex); + initialization_duration_timer.emplace(); + initialization_duration_timer->start(); + } + while (!stop_flag) { try @@ -69,6 +75,10 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread() initializeReplication(); initialized = true; + { + std::lock_guard lock(initialization_duration_timer_mutex); + initialization_duration_timer.reset(); + } return true; } catch (...) @@ -78,6 +88,11 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread() } } + { + std::lock_guard lock(initialization_duration_timer_mutex); + initialization_duration_timer.reset(); + } + return false; } @@ -459,4 +474,10 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const return max_id.load(); } +UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const +{ + std::lock_guard lock(initialization_duration_timer_mutex); + return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0; +} + } diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 41edf2221b8..2309c831839 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -36,6 +36,8 @@ public: DatabaseReplicated * const database, bool committed = false); /// NOLINT UInt32 getLogPointer() const; + + UInt64 getCurrentInitializationDurationMs() const; private: bool initializeMainThread() override; void initializeReplication(); @@ -56,6 +58,9 @@ private: ZooKeeperPtr active_node_holder_zookeeper; /// It will remove "active" node when database is detached zkutil::EphemeralNodeHolderPtr active_node_holder; + + std::optional initialization_duration_timer; + mutable std::mutex initialization_duration_timer_mutex; }; } diff --git a/src/Storages/System/StorageSystemClusters.cpp b/src/Storages/System/StorageSystemClusters.cpp index 160c8d6270e..d03b600b6ef 100644 --- a/src/Storages/System/StorageSystemClusters.cpp +++ b/src/Storages/System/StorageSystemClusters.cpp @@ -31,6 +31,8 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription() {"database_shard_name", std::make_shared(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."}, {"database_replica_name", std::make_shared(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."}, {"is_active", std::make_shared(std::make_shared()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."}, + {"replication_lag", std::make_shared(std::make_shared()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."}, + {"recovery_time", std::make_shared(std::make_shared()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."}, }; description.setAliases({ @@ -46,31 +48,30 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co writeCluster(res_columns, name_and_cluster, {}); const auto databases = DatabaseCatalog::instance().getDatabases(); - for (const auto & name_and_database : databases) + for (const auto & [database_name, database] : databases) { - if (const auto * replicated = typeid_cast(name_and_database.second.get())) + if (const auto * replicated = typeid_cast(database.get())) { - if (auto database_cluster = replicated->tryGetCluster()) - writeCluster(res_columns, {name_and_database.first, database_cluster}, - replicated->tryGetAreReplicasActive(database_cluster)); + writeCluster(res_columns, {database_name, database_cluster}, + replicated->tryGetReplicasInfo(database_cluster)); if (auto database_cluster = replicated->tryGetAllGroupsCluster()) - writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster}, - replicated->tryGetAreReplicasActive(database_cluster)); + writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster}, + replicated->tryGetReplicasInfo(database_cluster)); } } } void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, - const std::vector & is_active) + const ReplicasInfo & replicas_info) { const String & cluster_name = name_and_cluster.first; const ClusterPtr & cluster = name_and_cluster.second; const auto & shards_info = cluster->getShardsInfo(); const auto & addresses_with_failover = cluster->getShardsAddresses(); - size_t replica_idx = 0; + size_t global_replica_idx = 0; for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) { const auto & shard_info = shards_info[shard_index]; @@ -99,10 +100,24 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count()); res_columns[i++]->insert(address.database_shard_name); res_columns[i++]->insert(address.database_replica_name); - if (is_active.empty()) + if (replicas_info.empty()) + { res_columns[i++]->insertDefault(); + res_columns[i++]->insertDefault(); + res_columns[i++]->insertDefault(); + } else - res_columns[i++]->insert(is_active[replica_idx++]); + { + const auto & replica_info = replicas_info[global_replica_idx]; + res_columns[i++]->insert(replica_info.is_active); + res_columns[i++]->insert(replica_info.replication_lag); + if (replica_info.recovery_time != 0) + res_columns[i++]->insert(replica_info.recovery_time); + else + res_columns[i++]->insertDefault(); + } + + ++global_replica_idx; } } } diff --git a/src/Storages/System/StorageSystemClusters.h b/src/Storages/System/StorageSystemClusters.h index 0f7c792261d..f6e08734896 100644 --- a/src/Storages/System/StorageSystemClusters.h +++ b/src/Storages/System/StorageSystemClusters.h @@ -1,10 +1,10 @@ #pragma once +#include #include #include #include - namespace DB { @@ -27,7 +27,7 @@ protected: using NameAndCluster = std::pair>; void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; - static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector & is_active); + static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info); }; } diff --git a/tests/integration/test_recovery_time_metric/__init__.py b/tests/integration/test_recovery_time_metric/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_recovery_time_metric/configs/config.xml b/tests/integration/test_recovery_time_metric/configs/config.xml new file mode 100644 index 00000000000..bad9b1fa9ea --- /dev/null +++ b/tests/integration/test_recovery_time_metric/configs/config.xml @@ -0,0 +1,41 @@ + + 9000 + + + + + + + + + default + + + + + + 2181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + 20000 + + + + 1 + localhost + 9444 + + + + + + + localhost + 2181 + + 20000 + + + diff --git a/tests/integration/test_recovery_time_metric/test.py b/tests/integration/test_recovery_time_metric/test.py new file mode 100644 index 00000000000..6fcf2fad423 --- /dev/null +++ b/tests/integration/test_recovery_time_metric/test.py @@ -0,0 +1,61 @@ +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance( + "node", + main_configs=["configs/config.xml"], + stay_alive=True, +) + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_recovery_time_metric(start_cluster): + node.query( + """ + DROP DATABASE IF EXISTS rdb; + CREATE DATABASE rdb + ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1') + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS rdb.t; + CREATE TABLE rdb.t + ( + `x` UInt32 + ) + ENGINE = MergeTree + ORDER BY x + """ + ) + + node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"]) + + node.restart_clickhouse() + + ret = int( + node.query( + """ + SELECT recovery_time + FROM system.clusters + WHERE cluster = 'rdb' + """ + ).strip() + ) + assert ret > 0 + + node.query( + """ + DROP DATABASE rdb + """ + ) diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index cfae4fee6c2..32e8b2f4312 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -52,6 +52,8 @@ CREATE TABLE system.clusters `database_shard_name` String, `database_replica_name` String, `is_active` Nullable(UInt8), + `replication_lag` Nullable(UInt32), + `recovery_time` Nullable(UInt64), `name` String ALIAS cluster ) ENGINE = SystemClusters diff --git a/tests/queries/0_stateless/03206_replication_lag_metric.reference b/tests/queries/0_stateless/03206_replication_lag_metric.reference new file mode 100644 index 00000000000..02f4a7264b1 --- /dev/null +++ b/tests/queries/0_stateless/03206_replication_lag_metric.reference @@ -0,0 +1,4 @@ +0 +2 +0 +2 diff --git a/tests/queries/0_stateless/03206_replication_lag_metric.sql b/tests/queries/0_stateless/03206_replication_lag_metric.sql new file mode 100644 index 00000000000..998c332a11c --- /dev/null +++ b/tests/queries/0_stateless/03206_replication_lag_metric.sql @@ -0,0 +1,11 @@ +-- Tags: no-parallel + +CREATE DATABASE rdb1 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica1'); +CREATE DATABASE rdb2 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica2'); + +SET distributed_ddl_task_timeout = 0; +CREATE TABLE rdb1.t (id UInt32) ENGINE = ReplicatedMergeTree ORDER BY id; +SELECT replication_lag FROM system.clusters WHERE cluster IN ('rdb1', 'rdb2') ORDER BY cluster ASC, replica_num ASC; + +DROP DATABASE rdb1; +DROP DATABASE rdb2;