mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #66703 from ClickHouse/add-replication-lag-and-recovery-time-metrics
Add replication lag and recovery time metrics
This commit is contained in:
commit
f1df59d6c9
@ -12,6 +12,7 @@
|
|||||||
#include <Common/ZooKeeper/KeeperException.h>
|
#include <Common/ZooKeeper/KeeperException.h>
|
||||||
#include <Common/ZooKeeper/Types.h>
|
#include <Common/ZooKeeper/Types.h>
|
||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
|
#include <Common/ZooKeeper/IKeeper.h>
|
||||||
#include <Common/PoolId.h>
|
#include <Common/PoolId.h>
|
||||||
#include <Core/ServerSettings.h>
|
#include <Core/ServerSettings.h>
|
||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
@ -338,9 +339,12 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
|
|||||||
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
|
return std::make_shared<Cluster>(getContext()->getSettingsRef(), shards, params);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr & cluster_) const
|
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
|
||||||
{
|
{
|
||||||
Strings paths;
|
Strings paths_get, paths_exists;
|
||||||
|
|
||||||
|
paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
|
||||||
|
|
||||||
const auto & addresses_with_failover = cluster_->getShardsAddresses();
|
const auto & addresses_with_failover = cluster_->getShardsAddresses();
|
||||||
const auto & shards_info = cluster_->getShardsInfo();
|
const auto & shards_info = cluster_->getShardsInfo();
|
||||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||||
@ -348,32 +352,59 @@ std::vector<UInt8> DatabaseReplicated::tryGetAreReplicasActive(const ClusterPtr
|
|||||||
for (const auto & replica : addresses_with_failover[shard_index])
|
for (const auto & replica : addresses_with_failover[shard_index])
|
||||||
{
|
{
|
||||||
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
|
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
|
||||||
paths.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
|
paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
|
||||||
|
paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto current_zookeeper = getZooKeeper();
|
auto current_zookeeper = getZooKeeper();
|
||||||
auto res = current_zookeeper->exists(paths);
|
auto get_res = current_zookeeper->get(paths_get);
|
||||||
|
auto exist_res = current_zookeeper->exists(paths_exists);
|
||||||
|
chassert(get_res.size() == exist_res.size() + 1);
|
||||||
|
|
||||||
std::vector<UInt8> statuses;
|
auto max_log_ptr_zk = get_res[0];
|
||||||
statuses.resize(paths.size());
|
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
|
||||||
|
throw Coordination::Exception(max_log_ptr_zk.error);
|
||||||
|
|
||||||
for (size_t i = 0; i < res.size(); ++i)
|
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
|
||||||
if (res[i].error == Coordination::Error::ZOK)
|
|
||||||
statuses[i] = 1;
|
|
||||||
|
|
||||||
return statuses;
|
ReplicasInfo replicas_info;
|
||||||
}
|
replicas_info.resize(exist_res.size());
|
||||||
catch (...)
|
|
||||||
|
size_t global_replica_index = 0;
|
||||||
|
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||||
|
{
|
||||||
|
for (const auto & replica : addresses_with_failover[shard_index])
|
||||||
|
{
|
||||||
|
auto replica_active = exist_res[global_replica_index];
|
||||||
|
auto replica_log_ptr = get_res[global_replica_index + 1];
|
||||||
|
|
||||||
|
if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE)
|
||||||
|
throw Coordination::Exception(replica_active.error);
|
||||||
|
|
||||||
|
if (replica_log_ptr.error != Coordination::Error::ZOK)
|
||||||
|
throw Coordination::Exception(replica_log_ptr.error);
|
||||||
|
|
||||||
|
replicas_info[global_replica_index] = ReplicaInfo{
|
||||||
|
.is_active = replica_active.error == Coordination::Error::ZOK,
|
||||||
|
.replication_lag = max_log_ptr - parse<UInt32>(replica_log_ptr.data),
|
||||||
|
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
++global_replica_index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return replicas_info;
|
||||||
|
} catch (...)
|
||||||
{
|
{
|
||||||
tryLogCurrentException(log);
|
tryLogCurrentException(log);
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
|
void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco::Util::AbstractConfiguration & config_ref)
|
||||||
{
|
{
|
||||||
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);
|
const auto & config_prefix = fmt::format("named_collections.{}", collection_name);
|
||||||
|
@ -17,6 +17,14 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
|
|||||||
class Cluster;
|
class Cluster;
|
||||||
using ClusterPtr = std::shared_ptr<Cluster>;
|
using ClusterPtr = std::shared_ptr<Cluster>;
|
||||||
|
|
||||||
|
struct ReplicaInfo
|
||||||
|
{
|
||||||
|
bool is_active;
|
||||||
|
UInt32 replication_lag;
|
||||||
|
UInt64 recovery_time;
|
||||||
|
};
|
||||||
|
using ReplicasInfo = std::vector<ReplicaInfo>;
|
||||||
|
|
||||||
class DatabaseReplicated : public DatabaseAtomic
|
class DatabaseReplicated : public DatabaseAtomic
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -84,7 +92,7 @@ public:
|
|||||||
|
|
||||||
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
|
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & shard, const String & replica, bool throw_if_noop);
|
||||||
|
|
||||||
std::vector<UInt8> tryGetAreReplicasActive(const ClusterPtr & cluster_) const;
|
ReplicasInfo tryGetReplicasInfo(const ClusterPtr & cluster_) const;
|
||||||
|
|
||||||
void renameDatabase(ContextPtr query_context, const String & new_name) override;
|
void renameDatabase(ContextPtr query_context, const String & new_name) override;
|
||||||
|
|
||||||
|
@ -32,6 +32,12 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
|
|||||||
|
|
||||||
bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||||
|
initialization_duration_timer.emplace();
|
||||||
|
initialization_duration_timer->start();
|
||||||
|
}
|
||||||
|
|
||||||
while (!stop_flag)
|
while (!stop_flag)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@ -69,6 +75,10 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
|||||||
|
|
||||||
initializeReplication();
|
initializeReplication();
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
{
|
||||||
|
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||||
|
initialization_duration_timer.reset();
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -78,6 +88,11 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||||
|
initialization_duration_timer.reset();
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -459,4 +474,10 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
|
|||||||
return max_id.load();
|
return max_id.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
|
||||||
|
{
|
||||||
|
std::lock_guard lock(initialization_duration_timer_mutex);
|
||||||
|
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,8 @@ public:
|
|||||||
DatabaseReplicated * const database, bool committed = false); /// NOLINT
|
DatabaseReplicated * const database, bool committed = false); /// NOLINT
|
||||||
|
|
||||||
UInt32 getLogPointer() const;
|
UInt32 getLogPointer() const;
|
||||||
|
|
||||||
|
UInt64 getCurrentInitializationDurationMs() const;
|
||||||
private:
|
private:
|
||||||
bool initializeMainThread() override;
|
bool initializeMainThread() override;
|
||||||
void initializeReplication();
|
void initializeReplication();
|
||||||
@ -56,6 +58,9 @@ private:
|
|||||||
ZooKeeperPtr active_node_holder_zookeeper;
|
ZooKeeperPtr active_node_holder_zookeeper;
|
||||||
/// It will remove "active" node when database is detached
|
/// It will remove "active" node when database is detached
|
||||||
zkutil::EphemeralNodeHolderPtr active_node_holder;
|
zkutil::EphemeralNodeHolderPtr active_node_holder;
|
||||||
|
|
||||||
|
std::optional<Stopwatch> initialization_duration_timer;
|
||||||
|
mutable std::mutex initialization_duration_timer_mutex;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,8 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
|
|||||||
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
|
{"database_shard_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database shard (for clusters that belong to a `Replicated` database)."},
|
||||||
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
|
{"database_replica_name", std::make_shared<DataTypeString>(), "The name of the `Replicated` database replica (for clusters that belong to a `Replicated` database)."},
|
||||||
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
|
{"is_active", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "The status of the Replicated database replica (for clusters that belong to a Replicated database): 1 means 'replica is online', 0 means 'replica is offline', NULL means 'unknown'."},
|
||||||
|
{"replication_lag", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt32>()), "The replication lag of the `Replicated` database replica (for clusters that belong to a Replicated database)."},
|
||||||
|
{"recovery_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The recovery time of the `Replicated` database replica (for clusters that belong to a Replicated database), in milliseconds."},
|
||||||
};
|
};
|
||||||
|
|
||||||
description.setAliases({
|
description.setAliases({
|
||||||
@ -46,31 +48,30 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
|
|||||||
writeCluster(res_columns, name_and_cluster, {});
|
writeCluster(res_columns, name_and_cluster, {});
|
||||||
|
|
||||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||||
for (const auto & name_and_database : databases)
|
for (const auto & [database_name, database] : databases)
|
||||||
{
|
{
|
||||||
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(name_and_database.second.get()))
|
if (const auto * replicated = typeid_cast<const DatabaseReplicated *>(database.get()))
|
||||||
{
|
{
|
||||||
|
|
||||||
if (auto database_cluster = replicated->tryGetCluster())
|
if (auto database_cluster = replicated->tryGetCluster())
|
||||||
writeCluster(res_columns, {name_and_database.first, database_cluster},
|
writeCluster(res_columns, {database_name, database_cluster},
|
||||||
replicated->tryGetAreReplicasActive(database_cluster));
|
replicated->tryGetReplicasInfo(database_cluster));
|
||||||
|
|
||||||
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
|
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
|
||||||
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
|
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + database_name, database_cluster},
|
||||||
replicated->tryGetAreReplicasActive(database_cluster));
|
replicated->tryGetReplicasInfo(database_cluster));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
|
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
|
||||||
const std::vector<UInt8> & is_active)
|
const ReplicasInfo & replicas_info)
|
||||||
{
|
{
|
||||||
const String & cluster_name = name_and_cluster.first;
|
const String & cluster_name = name_and_cluster.first;
|
||||||
const ClusterPtr & cluster = name_and_cluster.second;
|
const ClusterPtr & cluster = name_and_cluster.second;
|
||||||
const auto & shards_info = cluster->getShardsInfo();
|
const auto & shards_info = cluster->getShardsInfo();
|
||||||
const auto & addresses_with_failover = cluster->getShardsAddresses();
|
const auto & addresses_with_failover = cluster->getShardsAddresses();
|
||||||
|
|
||||||
size_t replica_idx = 0;
|
size_t global_replica_idx = 0;
|
||||||
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
||||||
{
|
{
|
||||||
const auto & shard_info = shards_info[shard_index];
|
const auto & shard_info = shards_info[shard_index];
|
||||||
@ -99,10 +100,24 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
|
|||||||
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
|
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
|
||||||
res_columns[i++]->insert(address.database_shard_name);
|
res_columns[i++]->insert(address.database_shard_name);
|
||||||
res_columns[i++]->insert(address.database_replica_name);
|
res_columns[i++]->insert(address.database_replica_name);
|
||||||
if (is_active.empty())
|
if (replicas_info.empty())
|
||||||
|
{
|
||||||
res_columns[i++]->insertDefault();
|
res_columns[i++]->insertDefault();
|
||||||
|
res_columns[i++]->insertDefault();
|
||||||
|
res_columns[i++]->insertDefault();
|
||||||
|
}
|
||||||
else
|
else
|
||||||
res_columns[i++]->insert(is_active[replica_idx++]);
|
{
|
||||||
|
const auto & replica_info = replicas_info[global_replica_idx];
|
||||||
|
res_columns[i++]->insert(replica_info.is_active);
|
||||||
|
res_columns[i++]->insert(replica_info.replication_lag);
|
||||||
|
if (replica_info.recovery_time != 0)
|
||||||
|
res_columns[i++]->insert(replica_info.recovery_time);
|
||||||
|
else
|
||||||
|
res_columns[i++]->insertDefault();
|
||||||
|
}
|
||||||
|
|
||||||
|
++global_replica_idx;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <Databases/DatabaseReplicated.h>
|
||||||
#include <DataTypes/DataTypeString.h>
|
#include <DataTypes/DataTypeString.h>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -27,7 +27,7 @@ protected:
|
|||||||
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
|
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
|
||||||
|
|
||||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||||
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
|
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const ReplicasInfo & replicas_info);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,41 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<tcp_port>9000</tcp_port>
|
||||||
|
|
||||||
|
<profiles>
|
||||||
|
<default>
|
||||||
|
</default>
|
||||||
|
</profiles>
|
||||||
|
|
||||||
|
<users>
|
||||||
|
<default>
|
||||||
|
<profile>default</profile>
|
||||||
|
<no_password></no_password>
|
||||||
|
</default>
|
||||||
|
</users>
|
||||||
|
|
||||||
|
<keeper_server>
|
||||||
|
<tcp_port>2181</tcp_port>
|
||||||
|
<server_id>1</server_id>
|
||||||
|
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||||
|
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||||
|
<coordination_settings>
|
||||||
|
<session_timeout_ms>20000</session_timeout_ms>
|
||||||
|
</coordination_settings>
|
||||||
|
<raft_configuration>
|
||||||
|
<server>
|
||||||
|
<id>1</id>
|
||||||
|
<hostname>localhost</hostname>
|
||||||
|
<port>9444</port>
|
||||||
|
</server>
|
||||||
|
</raft_configuration>
|
||||||
|
</keeper_server>
|
||||||
|
|
||||||
|
<zookeeper>
|
||||||
|
<node index="1">
|
||||||
|
<host>localhost</host>
|
||||||
|
<port>2181</port>
|
||||||
|
</node>
|
||||||
|
<session_timeout_ms>20000</session_timeout_ms>
|
||||||
|
</zookeeper>
|
||||||
|
|
||||||
|
</clickhouse>
|
61
tests/integration/test_recovery_time_metric/test.py
Normal file
61
tests/integration/test_recovery_time_metric/test.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
import pytest
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
node = cluster.add_instance(
|
||||||
|
"node",
|
||||||
|
main_configs=["configs/config.xml"],
|
||||||
|
stay_alive=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_recovery_time_metric(start_cluster):
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
DROP DATABASE IF EXISTS rdb;
|
||||||
|
CREATE DATABASE rdb
|
||||||
|
ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1')
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
DROP TABLE IF EXISTS rdb.t;
|
||||||
|
CREATE TABLE rdb.t
|
||||||
|
(
|
||||||
|
`x` UInt32
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree
|
||||||
|
ORDER BY x
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
|
||||||
|
|
||||||
|
node.restart_clickhouse()
|
||||||
|
|
||||||
|
ret = int(
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
SELECT recovery_time
|
||||||
|
FROM system.clusters
|
||||||
|
WHERE cluster = 'rdb'
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
)
|
||||||
|
assert ret > 0
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
DROP DATABASE rdb
|
||||||
|
"""
|
||||||
|
)
|
@ -52,6 +52,8 @@ CREATE TABLE system.clusters
|
|||||||
`database_shard_name` String,
|
`database_shard_name` String,
|
||||||
`database_replica_name` String,
|
`database_replica_name` String,
|
||||||
`is_active` Nullable(UInt8),
|
`is_active` Nullable(UInt8),
|
||||||
|
`replication_lag` Nullable(UInt32),
|
||||||
|
`recovery_time` Nullable(UInt64),
|
||||||
`name` String ALIAS cluster
|
`name` String ALIAS cluster
|
||||||
)
|
)
|
||||||
ENGINE = SystemClusters
|
ENGINE = SystemClusters
|
||||||
|
@ -0,0 +1,4 @@
|
|||||||
|
0
|
||||||
|
2
|
||||||
|
0
|
||||||
|
2
|
11
tests/queries/0_stateless/03206_replication_lag_metric.sql
Normal file
11
tests/queries/0_stateless/03206_replication_lag_metric.sql
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
-- Tags: no-parallel
|
||||||
|
|
||||||
|
CREATE DATABASE rdb1 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica1');
|
||||||
|
CREATE DATABASE rdb2 ENGINE = Replicated('/test/test_replication_lag_metric', 'shard1', 'replica2');
|
||||||
|
|
||||||
|
SET distributed_ddl_task_timeout = 0;
|
||||||
|
CREATE TABLE rdb1.t (id UInt32) ENGINE = ReplicatedMergeTree ORDER BY id;
|
||||||
|
SELECT replication_lag FROM system.clusters WHERE cluster IN ('rdb1', 'rdb2') ORDER BY cluster ASC, replica_num ASC;
|
||||||
|
|
||||||
|
DROP DATABASE rdb1;
|
||||||
|
DROP DATABASE rdb2;
|
Loading…
Reference in New Issue
Block a user