ClickHouse/tests/integration/test_keeper_four_word_command/test.py
2021-11-19 16:03:01 +03:00

612 lines
17 KiB
Python

import socket
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from io import StringIO
import csv
import re
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'],
stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'],
stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml'],
stay_alive=True)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def destroy_zk_client(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
def clear_znodes():
zk = None
try:
zk = get_fake_zk(node3.name, timeout=30.0)
nodes = zk.get_children('/')
for node in [n for n in nodes if 'test_4lw_' in n]:
zk.delete('/' + node)
finally:
destroy_zk_client(zk)
def wait_node(node):
for _ in range(100):
zk = None
try:
zk = get_fake_zk(node.name, timeout=30.0)
# zk.create("/test", sequence=True)
print("node", node.name, "ready")
break
except Exception as ex:
time.sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
destroy_zk_client(zk)
else:
raise Exception("Can't wait node", node.name, "to become ready")
def wait_nodes():
for n in [node1, node2, node3]:
wait_node(n)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def get_keeper_socket(node_name):
hosts = cluster.get_instance_ip(node_name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, 9181))
return client
def close_keeper_socket(cli):
if cli is not None:
cli.close()
def reset_node_stats(node_name=node1.name):
client = None
try:
client = get_keeper_socket(node_name)
client.send(b'srst')
client.recv(10)
finally:
if client is not None:
client.close()
def send_4lw_cmd(node_name=node1.name, cmd='ruok'):
client = None
try:
client = get_keeper_socket(node_name)
client.send(cmd.encode())
data = client.recv(100_000)
data = data.decode()
return data
finally:
if client is not None:
client.close()
def reset_conn_stats(node_name=node1.name):
client = None
try:
client = get_keeper_socket(node_name)
client.send(b'crst')
client.recv(10_000)
finally:
if client is not None:
client.close()
def test_cmd_ruok(started_cluster):
client = None
try:
wait_nodes()
data = send_4lw_cmd(cmd='ruok')
assert data == 'imok'
finally:
close_keeper_socket(client)
def do_some_action(zk, create_cnt=0, get_cnt=0, set_cnt=0, ephemeral_cnt=0, watch_cnt=0, delete_cnt=0):
assert create_cnt >= get_cnt
assert create_cnt >= set_cnt
assert create_cnt >= watch_cnt
assert create_cnt >= delete_cnt
# ensure not delete watched node
assert create_cnt >= (delete_cnt + watch_cnt)
for i in range(create_cnt):
zk.create("/test_4lw_normal_node_" + str(i), b"")
for i in range(get_cnt):
zk.get("/test_4lw_normal_node_" + str(i))
for i in range(set_cnt):
zk.set("/test_4lw_normal_node_" + str(i), b"new-value")
for i in range(ephemeral_cnt):
zk.create("/test_4lw_ephemeral_node_" + str(i), ephemeral=True)
fake_ephemeral_event = None
def fake_ephemeral_callback(event):
print("Fake watch triggered")
nonlocal fake_ephemeral_event
fake_ephemeral_event = event
for i in range(watch_cnt):
zk.exists("/test_4lw_normal_node_" + str(i), watch=fake_ephemeral_callback)
for i in range(create_cnt - delete_cnt, create_cnt):
zk.delete("/test_4lw_normal_node_" + str(i))
def test_cmd_mntr(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
# reset stat first
reset_node_stats(node1.name)
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=10, get_cnt=10, set_cnt=5, ephemeral_cnt=2, watch_cnt=2, delete_cnt=2)
data = send_4lw_cmd(cmd='mntr')
# print(data.decode())
reader = csv.reader(data.split('\n'), delimiter='\t')
result = {}
for row in reader:
if len(row) != 0:
result[row[0]] = row[1]
assert len(result["zk_version"]) != 0
assert int(result["zk_avg_latency"]) >= 0
assert int(result["zk_max_latency"]) >= 0
assert int(result["zk_min_latency"]) >= 0
assert int(result["zk_min_latency"]) <= int(result["zk_avg_latency"])
assert int(result["zk_max_latency"]) >= int(result["zk_avg_latency"])
assert int(result["zk_num_alive_connections"]) == 1
assert int(result["zk_outstanding_requests"]) == 0
assert result["zk_server_state"] == "leader"
# contains:
# 10 nodes created by test
# 3 nodes created by clickhouse "/clickhouse/task_queue/ddl"
# 1 root node
assert int(result["zk_znode_count"]) == 11
assert int(result["zk_watch_count"]) == 2
assert int(result["zk_ephemerals_count"]) == 2
assert int(result["zk_approximate_data_size"]) > 0
assert int(result["zk_open_file_descriptor_count"]) > 0
assert int(result["zk_max_file_descriptor_count"]) > 0
assert int(result["zk_followers"]) == 2
assert int(result["zk_synced_followers"]) == 2
# contains 31 user request response and some responses for server startup
assert int(result["zk_packets_sent"]) >= 31
assert int(result["zk_packets_received"]) >= 31
finally:
destroy_zk_client(zk)
def test_cmd_srst(started_cluster):
client = None
try:
wait_nodes()
clear_znodes()
data = send_4lw_cmd(cmd='srst')
assert data.strip() == "Server stats reset."
data = send_4lw_cmd(cmd='mntr')
assert len(data) != 0
# print(data)
reader = csv.reader(data.split('\n'), delimiter='\t')
result = {}
for row in reader:
if len(row) != 0:
result[row[0]] = row[1]
assert int(result["zk_packets_received"]) == 0
assert int(result["zk_packets_sent"]) == 0
finally:
close_keeper_socket(client)
def test_cmd_conf(started_cluster):
client = None
try:
wait_nodes()
clear_znodes()
data = send_4lw_cmd(cmd='conf')
reader = csv.reader(data.split('\n'), delimiter='=')
result = {}
for row in reader:
if len(row) != 0:
print(row)
result[row[0]] = row[1]
assert result["server_id"] == "1"
assert result["tcp_port"] == "9181"
assert "tcp_port_secure" not in result
assert "superdigest" not in result
assert result["four_letter_word_white_list"] == "*"
assert result["log_storage_path"] == "/var/lib/clickhouse/coordination/log"
assert result["snapshot_storage_path"] == "/var/lib/clickhouse/coordination/snapshots"
assert result["session_timeout_ms"] == "30000"
assert result["operation_timeout_ms"] == "5000"
assert result["dead_session_check_period_ms"] == "500"
assert result["heart_beat_interval_ms"] == "500"
assert result["election_timeout_lower_bound_ms"] == "1000"
assert result["election_timeout_upper_bound_ms"] == "2000"
assert result["reserved_log_items"] == "100000"
assert result["snapshot_distance"] == "75"
assert result["auto_forwarding"] == "true"
assert result["shutdown_timeout"] == "5000"
assert result["startup_timeout"] == "180000"
assert result["raft_logs_level"] == "trace"
assert result["rotate_log_storage_interval"] == "100000"
assert result["snapshots_to_keep"] == "3"
assert result["stale_log_gap"] == "10000"
assert result["fresh_log_gap"] == "200"
assert result["max_requests_batch_size"] == "100"
assert result["quorum_reads"] == "false"
assert result["force_sync"] == "true"
assert result["compress_logs"] == "true"
assert result["compress_snapshots_with_zstd_format"] == "true"
assert result["configuration_change_tries_count"] == "20"
finally:
close_keeper_socket(client)
def test_cmd_isro(started_cluster):
wait_nodes()
assert send_4lw_cmd(node1.name, 'isro') == 'rw'
assert send_4lw_cmd(node2.name, 'isro') == 'ro'
def test_cmd_srvr(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats(node1.name)
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = send_4lw_cmd(cmd='srvr')
print("srvr output -------------------------------------")
print(data)
reader = csv.reader(data.split('\n'), delimiter=':')
result = {}
for row in reader:
if len(row) != 0:
result[row[0].strip()] = row[1].strip()
assert 'ClickHouse Keeper version' in result
assert 'Latency min/avg/max' in result
assert result['Received'] == '10'
assert result['Sent'] == '10'
assert int(result['Connections']) == 1
assert int(result['Zxid']) > 14
assert result['Mode'] == 'leader'
assert result['Node count'] == '11'
finally:
destroy_zk_client(zk)
def test_cmd_stat(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats(node1.name)
reset_conn_stats(node1.name)
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = send_4lw_cmd(cmd='stat')
print("stat output -------------------------------------")
print(data)
# keeper statistics
stats = [n for n in data.split('\n') if '=' not in n]
reader = csv.reader(stats, delimiter=':')
result = {}
for row in reader:
if len(row) != 0:
result[row[0].strip()] = row[1].strip()
assert 'ClickHouse Keeper version' in result
assert 'Latency min/avg/max' in result
assert result['Received'] == '10'
assert result['Sent'] == '10'
assert int(result['Connections']) == 1
assert int(result['Zxid']) > 14
assert result['Mode'] == 'leader'
assert result['Node count'] == '11'
# filter connection statistics
cons = [n for n in data.split('\n') if '=' in n]
# filter connection created by 'cons'
cons = [n for n in cons if 'recved=0' not in n and len(n) > 0]
assert len(cons) == 1
conn_stat = re.match(r'(.*?)[:].*[(](.*?)[)].*', cons[0].strip(), re.S).group(2)
assert conn_stat is not None
result = {}
for col in conn_stat.split(','):
col = col.strip().split('=')
result[col[0]] = col[1]
assert result['recved'] == '10'
assert result['sent'] == '10'
finally:
destroy_zk_client(zk)
def test_cmd_cons(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_conn_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = send_4lw_cmd(cmd='cons')
print("cons output -------------------------------------")
print(data)
# filter connection created by 'cons'
cons = [n for n in data.split('\n') if 'recved=0' not in n and len(n) > 0]
assert len(cons) == 1
conn_stat = re.match(r'(.*?)[:].*[(](.*?)[)].*', cons[0].strip(), re.S).group(2)
assert conn_stat is not None
result = {}
for col in conn_stat.split(','):
col = col.strip().split('=')
result[col[0]] = col[1]
assert result['recved'] == '10'
assert result['sent'] == '10'
assert 'sid' in result
assert result['lop'] == 'Create'
assert 'est' in result
assert result['to'] == '30000'
assert result['lcxid'] == '0x000000000000000a'
assert 'lzxid' in result
assert 'lresp' in result
assert int(result['llat']) >= 0
assert int(result['minlat']) >= 0
assert int(result['avglat']) >= 0
assert int(result['maxlat']) >= 0
finally:
destroy_zk_client(zk)
def test_cmd_crst(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_conn_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=10)
data = send_4lw_cmd(cmd='crst')
print("crst output -------------------------------------")
print(data)
data = send_4lw_cmd(cmd='cons')
# 2 connections, 1 for 'cons' command, 1 for zk
cons = [n for n in data.split('\n') if len(n) > 0]
assert len(cons) == 2
conn_stat = re.match(r'(.*?)[:].*[(](.*?)[)].*', cons[0].strip(), re.S).group(2)
assert conn_stat is not None
result = {}
for col in conn_stat.split(','):
col = col.strip().split('=')
result[col[0]] = col[1]
assert result['recved'] == '0'
assert result['sent'] == '0'
assert 'sid' in result
assert result['lop'] == 'NA'
assert 'est' in result
assert result['to'] == '30000'
assert 'lcxid' not in result
assert result['lzxid'] == '0xffffffffffffffff'
assert result['lresp'] == '0'
assert int(result['llat']) == 0
assert int(result['minlat']) == 0
assert int(result['avglat']) == 0
assert int(result['maxlat']) == 0
finally:
destroy_zk_client(zk)
def test_cmd_dump(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, ephemeral_cnt=2)
data = send_4lw_cmd(cmd='dump')
print("dump output -------------------------------------")
print(data)
list_data = data.split('\n')
session_count = int(re.match(r'.*[(](.*?)[)].*', list_data[0], re.S).group(1))
assert session_count == 1
assert '\t' + '/test_4lw_ephemeral_node_0' in list_data
assert '\t' + '/test_4lw_ephemeral_node_1' in list_data
finally:
destroy_zk_client(zk)
def test_cmd_wchs(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=2, watch_cnt=2)
data = send_4lw_cmd(cmd='wchs')
print("wchs output -------------------------------------")
print(data)
list_data = [n for n in data.split('\n') if len(n.strip()) > 0]
# 37 connections watching 632141 paths
# Total watches:632141
matcher = re.match(r'([0-9].*) connections watching ([0-9].*) paths', list_data[0], re.S)
conn_count = int(matcher.group(1))
watch_path_count = int(matcher.group(2))
watch_count = int(re.match(r'Total watches:([0-9].*)', list_data[1], re.S).group(1))
assert conn_count == 1
assert watch_path_count == 2
assert watch_count == 2
finally:
destroy_zk_client(zk)
def test_cmd_wchc(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=2, watch_cnt=2)
data = send_4lw_cmd(cmd='wchc')
print("wchc output -------------------------------------")
print(data)
list_data = [n for n in data.split('\n') if len(n.strip()) > 0]
assert len(list_data) == 3
assert '\t' + '/test_4lw_normal_node_0' in list_data
assert '\t' + '/test_4lw_normal_node_1' in list_data
finally:
destroy_zk_client(zk)
def test_cmd_wchp(started_cluster):
zk = None
try:
wait_nodes()
clear_znodes()
reset_node_stats()
zk = get_fake_zk(node1.name, timeout=30.0)
do_some_action(zk, create_cnt=2, watch_cnt=2)
data = send_4lw_cmd(cmd='wchp')
print("wchp output -------------------------------------")
print(data)
list_data = [n for n in data.split('\n') if len(n.strip()) > 0]
assert len(list_data) == 4
assert '/test_4lw_normal_node_0' in list_data
assert '/test_4lw_normal_node_1' in list_data
finally:
destroy_zk_client(zk)