Merge pull request #28526 from ClickHouse/add_test_for_two_nodes_configuration

Add test for keeper 2 node configuration
This commit is contained in:
alesapin 2021-09-06 13:44:38 +03:00 committed by GitHub
commit 8d67a34f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 242 additions and 0 deletions

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,33 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,33 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
</zookeeper>
</yandex>

View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def smaller_exception(ex):
return '\n'.join(str(ex).split('\n')[0:2])
def wait_node(node):
for _ in range(100):
zk = None
try:
node.query("SELECT * FROM system.zookeeper WHERE path = '/'")
zk = get_fake_zk(node.name, timeout=30.0)
zk.create("/test", sequence=True)
print("node", node.name, "ready")
break
except Exception as ex:
time.sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
if zk:
zk.stop()
zk.close()
else:
raise Exception("Can't wait node", node.name, "to become ready")
def wait_nodes():
for node in [node1, node2]:
wait_node(node)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_read_write_two_nodes(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node1_zk.create("/test_read_write_multinode_node1", b"somedata1")
node2_zk.create("/test_read_write_multinode_node2", b"somedata2")
# stale reads are allowed
while node1_zk.exists("/test_read_write_multinode_node2") is None:
time.sleep(0.1)
# stale reads are allowed
while node2_zk.exists("/test_read_write_multinode_node1") is None:
time.sleep(0.1)
assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
zk_conn.stop()
zk_conn.close()
except:
pass
def test_read_write_two_nodes_with_blocade(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1", timeout=5.0)
node2_zk = get_fake_zk("node2", timeout=5.0)
print("Blocking nodes")
with PartitionManager() as pm:
pm.partition_instances(node2, node1)
# We will respond conection loss but process this query
# after blocade will be removed
with pytest.raises(Exception):
node1_zk.create("/test_read_write_blocked_node1", b"somedata1")
# This node is not leader and will not process anything
with pytest.raises(Exception):
node2_zk.create("/test_read_write_blocked_node2", b"somedata2")
print("Nodes unblocked")
for i in range(10):
try:
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
break
except:
time.sleep(0.5)
for i in range(100):
try:
node1_zk.create("/test_after_block1", b"somedata12")
break
except:
time.sleep(0.1)
else:
raise Exception("node1 cannot recover after blockade")
print("Node1 created it's value")
for i in range(100):
try:
node2_zk.create("/test_after_block2", b"somedata12")
break
except:
time.sleep(0.1)
else:
raise Exception("node2 cannot recover after blockade")
print("Node2 created it's value")
# stale reads are allowed
while node1_zk.exists("/test_after_block2") is None:
time.sleep(0.1)
# stale reads are allowed
while node2_zk.exists("/test_after_block1") is None:
time.sleep(0.1)
assert node1_zk.exists("/test_after_block1") is not None
assert node1_zk.exists("/test_after_block2") is not None
assert node2_zk.exists("/test_after_block1") is not None
assert node2_zk.exists("/test_after_block2") is not None
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
zk_conn.stop()
zk_conn.close()
except:
pass