ClickHouse/tests/integration/test_keeper_session/test.py

168 lines
4.4 KiB
Python
Raw Normal View History

2021-12-29 13:25:55 +00:00
import pytest
from helpers.cluster import ClickHouseCluster
import time
2022-01-07 07:53:20 +00:00
import socket
import struct
from kazoo.client import KazooClient
2022-01-07 07:53:20 +00:00
# from kazoo.protocol.serialization import Connect, read_buffer, write_buffer
2021-12-29 13:25:55 +00:00
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/keeper_config.xml"], stay_alive=True
)
2021-12-29 13:25:55 +00:00
bool_struct = struct.Struct("B")
int_struct = struct.Struct("!i")
int_int_struct = struct.Struct("!ii")
int_int_long_struct = struct.Struct("!iiq")
2022-01-07 07:53:20 +00:00
int_long_int_long_struct = struct.Struct("!iqiq")
long_struct = struct.Struct("!q")
multiheader_struct = struct.Struct("!iBi")
reply_header_struct = struct.Struct("!iqi")
stat_struct = struct.Struct("!qqqqiiiqiiq")
2021-12-29 13:25:55 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def destroy_zk_client(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
def wait_node(node):
for _ in range(100):
zk = None
try:
zk = get_fake_zk(node.name, timeout=30.0)
print("node", node.name, "ready")
break
except Exception as ex:
time.sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
destroy_zk_client(zk)
else:
raise Exception("Can't wait node", node.name, "to become ready")
def wait_nodes():
for n in [node1]:
wait_node(n)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
2021-12-29 13:25:55 +00:00
_fake_zk_instance.start()
return _fake_zk_instance
2022-01-07 07:53:20 +00:00
def get_keeper_socket(node_name):
hosts = cluster.get_instance_ip(node_name)
client = socket.socket()
client.settimeout(10)
client.connect((hosts, 9181))
return client
def close_keeper_socket(cli):
if cli is not None:
cli.close()
def write_buffer(bytes):
if bytes is None:
return int_struct.pack(-1)
else:
return int_struct.pack(len(bytes)) + bytes
2021-12-29 13:25:55 +00:00
2022-01-07 07:53:20 +00:00
def read_buffer(bytes, offset):
length = int_struct.unpack_from(bytes, offset)[0]
offset += int_struct.size
if length < 0:
return None, offset
else:
index = offset
offset += length
return bytes[index : index + length], offset
2021-12-29 13:25:55 +00:00
2022-01-07 07:53:20 +00:00
def handshake(node_name=node1.name, session_timeout=1000, session_id=0):
client = None
try:
client = get_keeper_socket(node_name)
protocol_version = 0
last_zxid_seen = 0
session_passwd = b"\x00" * 16
2022-01-07 07:53:20 +00:00
read_only = 0
# Handshake serialize and deserialize code is from 'kazoo.protocol.serialization'.
# serialize handshake
req = bytearray()
req.extend(
int_long_int_long_struct.pack(
protocol_version, last_zxid_seen, session_timeout, session_id
)
)
2022-01-07 07:53:20 +00:00
req.extend(write_buffer(session_passwd))
req.extend([1 if read_only else 0])
# add header
req = int_struct.pack(45) + req
print("handshake request - len:", req.hex(), len(req))
# send request
client.send(req)
data = client.recv(1_000)
# deserialize response
print("handshake response - len:", data.hex(), len(data))
# ignore header
offset = 4
proto_version, negotiated_timeout, session_id = int_int_long_struct.unpack_from(
data, offset
)
2022-01-07 07:53:20 +00:00
offset += int_int_long_struct.size
password, offset = read_buffer(data, offset)
try:
read_only = bool_struct.unpack_from(data, offset)[0] == 1
offset += bool_struct.size
except struct.error:
read_only = False
print("negotiated_timeout - session_id", negotiated_timeout, session_id)
return negotiated_timeout, session_id
2021-12-29 13:25:55 +00:00
finally:
2022-01-07 07:53:20 +00:00
if client is not None:
client.close()
def test_session_timeout(started_cluster):
wait_nodes()
negotiated_timeout, _ = handshake(node1.name, session_timeout=1000, session_id=0)
assert negotiated_timeout == 5000
negotiated_timeout, _ = handshake(node1.name, session_timeout=8000, session_id=0)
assert negotiated_timeout == 8000
negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0)
assert negotiated_timeout == 10000