mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 11:33:46 +00:00
164 lines
5.0 KiB
Python
164 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
import random
|
|
import string
|
|
import os
|
|
import time
|
|
from multiprocessing.dummy import Pool
|
|
from helpers.network import PartitionManager
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True)
|
|
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True)
|
|
|
|
from kazoo.client import KazooClient, KazooState
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
def smaller_exception(ex):
|
|
return '\n'.join(str(ex).split('\n')[0:2])
|
|
|
|
def wait_node(node):
|
|
for _ in range(100):
|
|
zk = None
|
|
try:
|
|
node.query("SELECT * FROM system.zookeeper WHERE path = '/'")
|
|
zk = get_fake_zk(node.name, timeout=30.0)
|
|
zk.create("/test", sequence=True)
|
|
print("node", node.name, "ready")
|
|
break
|
|
except Exception as ex:
|
|
time.sleep(0.2)
|
|
print("Waiting until", node.name, "will be ready, exception", ex)
|
|
finally:
|
|
if zk:
|
|
zk.stop()
|
|
zk.close()
|
|
else:
|
|
raise Exception("Can't wait node", node.name, "to become ready")
|
|
|
|
def wait_nodes():
|
|
for node in [node1, node2]:
|
|
wait_node(node)
|
|
|
|
|
|
def get_fake_zk(nodename, timeout=30.0):
|
|
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
|
|
_fake_zk_instance.start()
|
|
return _fake_zk_instance
|
|
|
|
def test_read_write_two_nodes(started_cluster):
|
|
try:
|
|
wait_nodes()
|
|
node1_zk = get_fake_zk("node1")
|
|
node2_zk = get_fake_zk("node2")
|
|
|
|
node1_zk.create("/test_read_write_multinode_node1", b"somedata1")
|
|
node2_zk.create("/test_read_write_multinode_node2", b"somedata2")
|
|
|
|
# stale reads are allowed
|
|
while node1_zk.exists("/test_read_write_multinode_node2") is None:
|
|
time.sleep(0.1)
|
|
|
|
# stale reads are allowed
|
|
while node2_zk.exists("/test_read_write_multinode_node1") is None:
|
|
time.sleep(0.1)
|
|
|
|
assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
|
|
assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
|
|
|
|
assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
|
|
assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
|
|
|
|
finally:
|
|
try:
|
|
for zk_conn in [node1_zk, node2_zk]:
|
|
zk_conn.stop()
|
|
zk_conn.close()
|
|
except:
|
|
pass
|
|
|
|
def test_read_write_two_nodes_with_blocade(started_cluster):
|
|
try:
|
|
wait_nodes()
|
|
node1_zk = get_fake_zk("node1", timeout=5.0)
|
|
node2_zk = get_fake_zk("node2", timeout=5.0)
|
|
|
|
print("Blocking nodes")
|
|
with PartitionManager() as pm:
|
|
pm.partition_instances(node2, node1)
|
|
|
|
# We will respond conection loss but process this query
|
|
# after blocade will be removed
|
|
with pytest.raises(Exception):
|
|
node1_zk.create("/test_read_write_blocked_node1", b"somedata1")
|
|
|
|
# This node is not leader and will not process anything
|
|
with pytest.raises(Exception):
|
|
node2_zk.create("/test_read_write_blocked_node2", b"somedata2")
|
|
|
|
|
|
print("Nodes unblocked")
|
|
for i in range(10):
|
|
try:
|
|
node1_zk = get_fake_zk("node1")
|
|
node2_zk = get_fake_zk("node2")
|
|
break
|
|
except:
|
|
time.sleep(0.5)
|
|
|
|
|
|
for i in range(100):
|
|
try:
|
|
node1_zk.create("/test_after_block1", b"somedata12")
|
|
break
|
|
except:
|
|
time.sleep(0.1)
|
|
else:
|
|
raise Exception("node1 cannot recover after blockade")
|
|
|
|
print("Node1 created it's value")
|
|
|
|
for i in range(100):
|
|
try:
|
|
node2_zk.create("/test_after_block2", b"somedata12")
|
|
break
|
|
except:
|
|
time.sleep(0.1)
|
|
else:
|
|
raise Exception("node2 cannot recover after blockade")
|
|
|
|
print("Node2 created it's value")
|
|
|
|
# stale reads are allowed
|
|
while node1_zk.exists("/test_after_block2") is None:
|
|
time.sleep(0.1)
|
|
|
|
# stale reads are allowed
|
|
while node2_zk.exists("/test_after_block1") is None:
|
|
time.sleep(0.1)
|
|
|
|
assert node1_zk.exists("/test_after_block1") is not None
|
|
assert node1_zk.exists("/test_after_block2") is not None
|
|
assert node2_zk.exists("/test_after_block1") is not None
|
|
assert node2_zk.exists("/test_after_block2") is not None
|
|
|
|
finally:
|
|
try:
|
|
for zk_conn in [node1_zk, node2_zk]:
|
|
zk_conn.stop()
|
|
zk_conn.close()
|
|
except:
|
|
pass
|