StorageSystemReplicas added column replica_is_active

This commit is contained in:
Maksim Kita 2021-08-04 16:19:42 +03:00
parent 5cd12892e9
commit 3f48c85722
6 changed files with 64 additions and 3 deletions

View File

@ -82,6 +82,7 @@ The next 4 columns have a non-zero value only where there is an active session w
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - The total number of known replicas of this table.
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ZooKeeper (i.e., the number of functioning replicas).
- `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active.
If you request all the columns, the table may work a bit slowly, since several reads from ZooKeeper are made for each row.
If you do not request the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), the table works quickly.

View File

@ -5575,8 +5575,11 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
res.total_replicas = all_replicas.size();
for (const String & replica : all_replicas)
if (zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active"))
++res.active_replicas;
{
bool is_replica_active = zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active");
res.active_replicas += static_cast<UInt8>(is_replica_active);
res.replica_is_active.emplace(replica, is_replica_active);
}
}
catch (const Coordination::Exception &)
{

View File

@ -176,6 +176,7 @@ public:
UInt8 active_replicas;
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
String zookeeper_exception;
std::unordered_map<std::string, bool> replica_is_active;
};
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeMap.h>
#include <Storages/System/StorageSystemReplicas.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
@ -51,6 +52,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_)
{ "total_replicas", std::make_shared<DataTypeUInt8>() },
{ "active_replicas", std::make_shared<DataTypeUInt8>() },
{ "zookeeper_exception", std::make_shared<DataTypeString>() },
{ "replica_is_active", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt8>()) }
}));
setInMemoryMetadata(storage_metadata);
}
@ -101,7 +103,8 @@ Pipe StorageSystemReplicas::read(
|| column_name == "log_pointer"
|| column_name == "total_replicas"
|| column_name == "active_replicas"
|| column_name == "zookeeper_exception")
|| column_name == "zookeeper_exception"
|| column_name == "replica_is_active")
{
with_zk_fields = true;
break;
@ -184,6 +187,18 @@ Pipe StorageSystemReplicas::read(
res_columns[col_num++]->insert(status.total_replicas);
res_columns[col_num++]->insert(status.active_replicas);
res_columns[col_num++]->insert(status.zookeeper_exception);
Map replica_is_active_values;
for (const auto & [name, is_active] : status.replica_is_active)
{
Tuple is_replica_active_value;
is_replica_active_value.emplace_back(name);
is_replica_active_value.emplace_back(is_active);
replica_is_active_values.emplace_back(std::move(is_replica_active_value));
}
res_columns[col_num++]->insert(std::move(replica_is_active_values));
}
Block header = metadata_snapshot->getSampleBlock();

View File

@ -0,0 +1,41 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
for i, node in enumerate((node1, node2, node3)):
node_name = 'node' + str(i + 1)
node.query(
'''
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_table', '{}')
PARTITION BY date ORDER BY id
'''.format(node_name)
)
yield cluster
finally:
cluster.shutdown()
def test_replica_is_active(start_cluster):
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':1}\n'
node3.stop()
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':0}\n'
node2.stop()
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
assert query_result == '{\'node1\':1,\'node2\':0,\'node3\':0}\n'