Merge branch 'master' into test_soundex

This commit is contained in:
FriendLey 2023-04-11 19:16:10 +08:00 committed by GitHub
commit c527347736
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 38 additions and 1 deletions

View File

@ -50,6 +50,7 @@ last_queue_update: 2021-10-12 14:50:08
absolute_delay: 99
total_replicas: 5
active_replicas: 5
lost_part_count: 0
last_queue_update_exception:
zookeeper_exception:
replica_is_active: {'r1':1,'r2':1}
@ -90,6 +91,7 @@ The next 4 columns have a non-zero value only where there is an active session w
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - The total number of known replicas of this table.
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ClickHouse Keeper (i.e., the number of functioning replicas).
- `lost_part_count` (`UInt64`) - The number of data parts lost in the table by all replicas in total since table creation. Value is persisted in ClickHouse Keeper and can only increase.
- `last_queue_update_exception` (`String`) - When the queue contains broken entries. Especially important when ClickHouse breaks backward compatibility between versions and log entries written by newer versions aren't parseable by old versions.
- `zookeeper_exception` (`String`) - The last exception message, got if the error happened when fetching the info from ClickHouse Keeper.
- `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active.

View File

@ -25,6 +25,7 @@ struct ReplicatedTableStatus
UInt64 absolute_delay;
UInt8 total_replicas;
UInt8 active_replicas;
UInt64 lost_part_count;
String last_queue_update_exception;
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception;

View File

@ -652,6 +652,8 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/alter_partition_version", String(), zkutil::CreateMode::Persistent));
/// For deduplication of async inserts
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/async_blocks", String(), zkutil::CreateMode::Persistent));
/// To track "lost forever" parts count, just for `system.replicas` table
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/lost_part_count", String(), zkutil::CreateMode::Persistent));
/// As for now, "/temp" node must exist, but we want to be able to remove it in future
if (zookeeper->exists(zookeeper_path + "/temp"))
@ -5968,6 +5970,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
res.log_pointer = 0;
res.total_replicas = 0;
res.active_replicas = 0;
res.lost_part_count = 0;
res.last_queue_update_exception = getLastQueueUpdateException();
if (with_zk_fields && !res.is_session_expired)
@ -5984,6 +5987,7 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
paths.clear();
paths.push_back(fs::path(replica_path) / "log_pointer");
paths.push_back(fs::path(zookeeper_path) / "lost_part_count");
for (const String & replica : all_replicas)
paths.push_back(fs::path(zookeeper_path) / "replicas" / replica / "is_active");
@ -6001,10 +6005,14 @@ void StorageReplicatedMergeTree::getStatus(ReplicatedTableStatus & res, bool wit
res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
res.total_replicas = all_replicas.size();
if (get_result[1].error == Coordination::Error::ZNONODE)
res.lost_part_count = 0;
else
res.lost_part_count = get_result[1].data.empty() ? 0 : parse<UInt64>(get_result[1].data);
for (size_t i = 0, size = all_replicas.size(); i < size; ++i)
{
bool is_replica_active = get_result[i + 1].error != Coordination::Error::ZNONODE;
bool is_replica_active = get_result[i + 2].error != Coordination::Error::ZNONODE;
res.active_replicas += static_cast<UInt8>(is_replica_active);
res.replica_is_active.emplace(all_replicas[i], is_replica_active);
}
@ -8869,6 +8877,20 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
getCommitPartOps(ops, new_data_part);
/// Increment lost_part_count
auto lost_part_count_path = fs::path(zookeeper_path) / "lost_part_count";
Coordination::Stat lost_part_count_stat;
String lost_part_count_str;
if (zookeeper->tryGet(lost_part_count_path, lost_part_count_str, &lost_part_count_stat))
{
UInt64 lost_part_count = lost_part_count_str.empty() ? 0 : parse<UInt64>(lost_part_count_str);
ops.emplace_back(zkutil::makeSetRequest(lost_part_count_path, toString(lost_part_count + 1), lost_part_count_stat.version));
}
else
{
ops.emplace_back(zkutil::makeCreateRequest(lost_part_count_path, "1", zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
if (auto code = zookeeper->tryMulti(ops, responses); code == Coordination::Error::ZOK)
{

View File

@ -60,6 +60,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_)
{ "absolute_delay", std::make_shared<DataTypeUInt64>() },
{ "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() },
{ "lost_part_count", std::make_shared<DataTypeUInt64>() },
{ "last_queue_update_exception", std::make_shared<DataTypeString>() },
{ "zookeeper_exception", std::make_shared<DataTypeString>() },
{ "replica_is_active", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt8>()) }
@ -113,6 +114,7 @@ Pipe StorageSystemReplicas::read(
|| column_name == "log_pointer"
|| column_name == "total_replicas"
|| column_name == "active_replicas"
|| column_name == "lost_part_count"
|| column_name == "zookeeper_exception"
|| column_name == "replica_is_active")
{
@ -213,6 +215,7 @@ Pipe StorageSystemReplicas::read(
res_columns[col_num++]->insert(status.absolute_delay);
res_columns[col_num++]->insert(status.total_replicas);
res_columns[col_num++]->insert(status.active_replicas);
res_columns[col_num++]->insert(status.lost_part_count);
res_columns[col_num++]->insert(status.last_queue_update_exception);
res_columns[col_num++]->insert(status.zookeeper_exception);

View File

@ -10,6 +10,7 @@ insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
select lost_part_count from system.replicas where database = currentDatabase() and table = 'rmt2';
drop table rmt2;
@ -21,6 +22,7 @@ insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
select lost_part_count from system.replicas where database = currentDatabase() and table = 'rmt2';
drop table rmt2;

View File

@ -867,6 +867,7 @@ CREATE TABLE system.replicas
`absolute_delay` UInt64,
`total_replicas` UInt8,
`active_replicas` UInt8,
`lost_part_count` UInt64,
`last_queue_update_exception` String,
`zookeeper_exception` String,
`replica_is_active` Map(String, UInt8)

View File

@ -36,6 +36,8 @@ log
log
log_pointer
log_pointer
lost_part_count
lost_part_count
max_processed_insert_time
max_processed_insert_time
metadata

View File

@ -17,6 +17,7 @@ leader_election
leader_election-0
log
log_pointer
lost_part_count
max_processed_insert_time
metadata
metadata
@ -58,6 +59,7 @@ leader_election
leader_election-0
log
log_pointer
lost_part_count
max_processed_insert_time
metadata
metadata