This commit is contained in:
Michael Stetsyuk 2024-07-30 18:15:24 +01:00
parent 932033fca9
commit aa26291ff2
5 changed files with 98 additions and 26 deletions

View File

@ -12,6 +12,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
@ -340,31 +341,63 @@ ClusterPtr DatabaseReplicated::getClusterImpl(bool all_groups) const
ReplicasInfo DatabaseReplicated::tryGetReplicasInfo(const ClusterPtr & cluster_) const
{
ReplicasInfo res;
Strings paths_get, paths_exists;
paths_get.emplace_back(fs::path(zookeeper_path) / "max_log_ptr");
auto zookeeper = getZooKeeper();
const auto & addresses_with_failover = cluster_->getShardsAddresses();
const auto & shards_info = cluster_->getShardsInfo();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
paths_exists.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "active");
paths_get.emplace_back(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr");
}
}
try
{
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(zookeeper_path + "/max_log_ptr"));
auto current_zookeeper = getZooKeeper();
auto get_res = current_zookeeper->get(paths_get);
auto exist_res = current_zookeeper->exists(paths_exists);
chassert(get_res.size() == exist_res.size() + 1);
auto max_log_ptr_zk = get_res[0];
if (max_log_ptr_zk.error != Coordination::Error::ZOK)
throw Coordination::Exception(max_log_ptr_zk.error);
UInt32 max_log_ptr = parse<UInt32>(max_log_ptr_zk.data);
ReplicasInfo replicas_info;
replicas_info.resize(exist_res.size());
size_t global_replica_index = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
for (const auto & replica : addresses_with_failover[shard_index])
{
String full_name = getFullReplicaName(replica.database_shard_name, replica.database_replica_name);
UInt32 log_ptr = parse<UInt32>(zookeeper->get(fs::path(zookeeper_path) / "replicas" / full_name / "log_ptr"));
bool is_active = zookeeper->exists(fs::path(zookeeper_path) / "replicas" / full_name / "active");
res.push_back(ReplicaInfo{
.is_active = is_active,
.replication_lag = max_log_ptr - log_ptr,
auto replica_active = exist_res[global_replica_index];
auto replica_log_ptr = get_res[global_replica_index + 1];
if (replica_active.error != Coordination::Error::ZOK && replica_active.error != Coordination::Error::ZNONODE)
throw Coordination::Exception(replica_active.error);
if (replica_log_ptr.error != Coordination::Error::ZOK)
throw Coordination::Exception(replica_log_ptr.error);
replicas_info[global_replica_index] = ReplicaInfo{
.is_active = replica_active.error == Coordination::Error::ZOK,
.replication_lag = max_log_ptr - parse<UInt32>(replica_log_ptr.data),
.recovery_time = replica.is_local ? ddl_worker->getCurrentInitializationDurationMs() : 0,
});
};
++global_replica_index;
}
}
return res;
return replicas_info;
} catch (...)
{
tryLogCurrentException(log);

View File

@ -32,8 +32,11 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
initialization_duration_timer.restart();
initializing.store(true, std::memory_order_release);
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.emplace();
initialization_duration_timer->start();
}
while (!stop_flag)
{
@ -72,7 +75,10 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
initializeReplication();
initialized = true;
initializing.store(false, std::memory_order_relaxed);
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return true;
}
catch (...)
@ -82,7 +88,11 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
}
}
initializing.store(false, std::memory_order_relaxed);
{
std::lock_guard lock(initialization_duration_timer_mutex);
initialization_duration_timer.reset();
}
return false;
}
@ -466,7 +476,8 @@ UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
UInt64 DatabaseReplicatedDDLWorker::getCurrentInitializationDurationMs() const
{
return initializing.load(std::memory_order_acquire) ? initialization_duration_timer.elapsedMilliseconds() : 0;
std::lock_guard lock(initialization_duration_timer_mutex);
return initialization_duration_timer ? initialization_duration_timer->elapsedMilliseconds() : 0;
}
}

View File

@ -59,8 +59,8 @@ private:
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
Stopwatch initialization_duration_timer;
std::atomic<bool> initializing = false;
std::optional<Stopwatch> initialization_duration_timer;
mutable std::mutex initialization_duration_timer_mutex;
};
}

View File

@ -71,7 +71,7 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
size_t replica_idx = 0;
size_t global_replica_idx = 0;
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
const auto & shard_info = shards_info[shard_index];
@ -108,7 +108,7 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
}
else
{
const auto & replica_info = replicas_info[replica_idx++];
const auto & replica_info = replicas_info[global_replica_idx];
res_columns[i++]->insert(replica_info.is_active);
res_columns[i++]->insert(replica_info.replication_lag);
if (replica_info.recovery_time != 0)
@ -116,6 +116,8 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
else
res_columns[i++]->insertDefault();
}
++global_replica_idx;
}
}
}

View File

@ -5,7 +5,6 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
with_zookeeper=True,
stay_alive=True,
)
@ -21,9 +20,36 @@ def start_cluster():
def test_recovery_time_metric(start_cluster):
node.query(
"CREATE DATABASE rdb ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1');"
"""
CREATE DATABASE rdb
ENGINE = Replicated('/test/test_recovery_time_metric', 'shard1', 'replica1')
"""
)
node.query("CREATE TABLE rdb.t (x UInt32) ENGINE = MergeTree ORDER BY x;")
node.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"])
node.query(
"""
CREATE TABLE rdb.t
(
`x` UInt32
)
ENGINE = MergeTree
ORDER BY x
"""
)
node.exec_in_container(
["bash", "-c", "rm /var/lib/clickhouse/metadata/rdb/t.sql"]
)
node.restart_clickhouse()
assert node.query("SELECT any(recovery_time) FROM system.clusters;") != "0\n"
ret = int(
node.query(
"""
SELECT recovery_time
FROM system.clusters
WHERE cluster = 'rdb'
"""
).strip()
)
assert ret > 0