mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 07:01:59 +00:00
Merge pull request #27180 from kitaisreal/storage-system-replicas-added-column-replica-is-active
Storage system replicas added column replica is active
This commit is contained in:
commit
7fdf3cc263
@ -82,6 +82,7 @@ The next 4 columns have a non-zero value only where there is an active session w
|
||||
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
|
||||
- `total_replicas` (`UInt8`) - The total number of known replicas of this table.
|
||||
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ZooKeeper (i.e., the number of functioning replicas).
|
||||
- `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active.
|
||||
|
||||
If you request all the columns, the table may work a bit slowly, since several reads from ZooKeeper are made for each row.
|
||||
If you do not request the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), the table works quickly.
|
||||
|
@ -5582,8 +5582,11 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
|
||||
res.total_replicas = all_replicas.size();
|
||||
|
||||
for (const String & replica : all_replicas)
|
||||
if (zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active"))
|
||||
++res.active_replicas;
|
||||
{
|
||||
bool is_replica_active = zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active");
|
||||
res.active_replicas += static_cast<UInt8>(is_replica_active);
|
||||
res.replica_is_active.emplace(replica, is_replica_active);
|
||||
}
|
||||
}
|
||||
catch (const Coordination::Exception &)
|
||||
{
|
||||
|
@ -176,6 +176,7 @@ public:
|
||||
UInt8 active_replicas;
|
||||
/// If the error has happened fetching the info from ZooKeeper, this field will be set.
|
||||
String zookeeper_exception;
|
||||
std::unordered_map<std::string, bool> replica_is_active;
|
||||
};
|
||||
|
||||
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <Storages/System/StorageSystemReplicas.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
@ -51,6 +52,7 @@ StorageSystemReplicas::StorageSystemReplicas(const StorageID & table_id_)
|
||||
{ "total_replicas", std::make_shared<DataTypeUInt8>() },
|
||||
{ "active_replicas", std::make_shared<DataTypeUInt8>() },
|
||||
{ "zookeeper_exception", std::make_shared<DataTypeString>() },
|
||||
{ "replica_is_active", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt8>()) }
|
||||
}));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
@ -101,7 +103,8 @@ Pipe StorageSystemReplicas::read(
|
||||
|| column_name == "log_pointer"
|
||||
|| column_name == "total_replicas"
|
||||
|| column_name == "active_replicas"
|
||||
|| column_name == "zookeeper_exception")
|
||||
|| column_name == "zookeeper_exception"
|
||||
|| column_name == "replica_is_active")
|
||||
{
|
||||
with_zk_fields = true;
|
||||
break;
|
||||
@ -184,6 +187,18 @@ Pipe StorageSystemReplicas::read(
|
||||
res_columns[col_num++]->insert(status.total_replicas);
|
||||
res_columns[col_num++]->insert(status.active_replicas);
|
||||
res_columns[col_num++]->insert(status.zookeeper_exception);
|
||||
|
||||
Map replica_is_active_values;
|
||||
for (const auto & [name, is_active] : status.replica_is_active)
|
||||
{
|
||||
Tuple is_replica_active_value;
|
||||
is_replica_active_value.emplace_back(name);
|
||||
is_replica_active_value.emplace_back(is_active);
|
||||
|
||||
replica_is_active_values.emplace_back(std::move(is_replica_active_value));
|
||||
}
|
||||
|
||||
res_columns[col_num++]->insert(std::move(replica_is_active_values));
|
||||
}
|
||||
|
||||
Block header = metadata_snapshot->getSampleBlock();
|
||||
|
41
tests/integration/test_replica_is_active/test.py
Normal file
41
tests/integration/test_replica_is_active/test.py
Normal file
@ -0,0 +1,41 @@
|
||||
import pytest
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', with_zookeeper=True)
|
||||
node3 = cluster.add_instance('node3', with_zookeeper=True)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for i, node in enumerate((node1, node2, node3)):
|
||||
node_name = 'node' + str(i + 1)
|
||||
node.query(
|
||||
'''
|
||||
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_table', '{}')
|
||||
PARTITION BY date ORDER BY id
|
||||
'''.format(node_name)
|
||||
)
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_replica_is_active(start_cluster):
|
||||
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
|
||||
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':1}\n'
|
||||
|
||||
node3.stop()
|
||||
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
|
||||
assert query_result == '{\'node1\':1,\'node2\':1,\'node3\':0}\n'
|
||||
|
||||
node2.stop()
|
||||
query_result = node1.query("select replica_is_active from system.replicas where table = 'test_table'")
|
||||
assert query_result == '{\'node1\':1,\'node2\':0,\'node3\':0}\n'
|
Loading…
Reference in New Issue
Block a user