ClickHouse/tests/integration/test_testkeeper_multinode_blocade_leader/test.py
2021-02-23 11:37:27 +03:00

374 lines
16 KiB
Python

import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState
"""
In this test, we blockade RAFT leader and check that the whole system is
able to recover. It's not a good test because we use ClickHouse's replicated
tables to check connectivity, but they may require special operations (or a long
wait) after session expiration. We don't use kazoo, because this client pretends
to be very smart: SUSPEND sessions, try to recover them, and so on. The test
will be even less predictable than with ClickHouse tables.
TODO find (or write) not so smart python client.
TODO remove this when jepsen tests will be written.
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def smaller_exception(ex):
return '\n'.join(str(ex).split('\n')[0:2])
def wait_node(node):
for _ in range(100):
zk = None
try:
node.query("SELECT * FROM system.zookeeper WHERE path = '/'")
zk = get_fake_zk(node.name, timeout=30.0)
zk.create("/test", sequence=True)
print("node", node.name, "ready")
break
except Exception as ex:
time.sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
if zk:
zk.stop()
zk.close()
else:
raise Exception("Can't wait node", node.name, "to become ready")
def wait_nodes():
for node in [node1, node2, node3]:
wait_node(node)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE DATABASE IF NOT EXISTS ordinary ENGINE=Ordinary")
node.query("CREATE TABLE IF NOT EXISTS ordinary.t1 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t1', '{}') ORDER BY tuple()".format(i + 1))
break
except Exception as ex:
print("Got exception from node", smaller_exception(ex))
time.sleep(0.1)
node2.query("INSERT INTO ordinary.t1 SELECT number FROM numbers(10)")
node1.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
node3.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t1", "10")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t1", "10")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t1", "10")
with PartitionManager() as pm:
pm.partition_instances(node2, node1)
pm.partition_instances(node3, node1)
for i in range(100):
try:
restart_replica_for_sure(node2, "ordinary.t1", "/clickhouse/t1/replicas/2")
node2.query("INSERT INTO ordinary.t1 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
try:
node2.query("ATTACH TABLE ordinary.t1")
except Exception as attach_ex:
print("Got exception node2", smaller_exception(attach_ex))
print("Got exception node2", smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert False, "Cannot insert anything node2"
for i in range(100):
try:
restart_replica_for_sure(node3, "ordinary.t1", "/clickhouse/t1/replicas/3")
node3.query("INSERT INTO ordinary.t1 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
try:
node3.query("ATTACH TABLE ordinary.t1")
except Exception as attach_ex:
print("Got exception node3", smaller_exception(attach_ex))
print("Got exception node3", smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert False, "Cannot insert anything node3"
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
restart_replica_for_sure(node, "ordinary.t1", "/clickhouse/t1/replicas/{}".format(n + 1))
break
except Exception as ex:
try:
node.query("ATTACH TABLE ordinary.t1")
except Exception as attach_ex:
print("Got exception node{}".format(n + 1), smaller_exception(attach_ex))
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
else:
assert False, "Cannot reconnect for node{}".format(n + 1)
for i in range(100):
try:
node1.query("INSERT INTO ordinary.t1 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
print("Got exception node1", smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert False, "Cannot insert anything node1"
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
restart_replica_for_sure(node, "ordinary.t1", "/clickhouse/t1/replicas/{}".format(n + 1))
node.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
break
except Exception as ex:
try:
node.query("ATTACH TABLE ordinary.t1")
except Exception as attach_ex:
print("Got exception node{}".format(n + 1), smaller_exception(attach_ex))
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert False, "Cannot sync replica node{}".format(n+1)
if node1.query("SELECT COUNT() FROM ordinary.t1") != "310\n":
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t1", "310")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t1", "310")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t1", "310")
def dump_zk(node, zk_path, replica_path):
print(node.query("SELECT * FROM system.replication_queue FORMAT Vertical"))
print("Replicas")
print(node.query("SELECT * FROM system.replicas FORMAT Vertical"))
print("Replica 2 info")
print(node.query("SELECT * FROM system.zookeeper WHERE path = '{}' FORMAT Vertical".format(zk_path)))
print("Queue")
print(node.query("SELECT * FROM system.zookeeper WHERE path = '{}/queue' FORMAT Vertical".format(replica_path)))
print("Log")
print(node.query("SELECT * FROM system.zookeeper WHERE path = '{}/log' FORMAT Vertical".format(zk_path)))
print("Parts")
print(node.query("SELECT name FROM system.zookeeper WHERE path = '{}/parts' FORMAT Vertical".format(replica_path)))
def restart_replica_for_sure(node, table_name, zk_replica_path):
fake_zk = None
try:
node.query("DETACH TABLE {}".format(table_name))
fake_zk = get_fake_zk(node.name)
if fake_zk.exists(zk_replica_path + "/is_active") is not None:
fake_zk.delete(zk_replica_path + "/is_active")
node.query("ATTACH TABLE {}".format(table_name))
except Exception as ex:
print("Exception", ex)
raise ex
finally:
if fake_zk:
fake_zk.stop()
fake_zk.close()
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader_twice(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE DATABASE IF NOT EXISTS ordinary ENGINE=Ordinary")
node.query("CREATE TABLE IF NOT EXISTS ordinary.t2 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t2', '{}') ORDER BY tuple()".format(i + 1))
break
except Exception as ex:
print("Got exception from node", smaller_exception(ex))
time.sleep(0.1)
node2.query("INSERT INTO ordinary.t2 SELECT number FROM numbers(10)")
node1.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
node3.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t2", "10")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "10")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "10")
with PartitionManager() as pm:
pm.partition_instances(node2, node1)
pm.partition_instances(node3, node1)
for i in range(100):
try:
restart_replica_for_sure(node2, "ordinary.t2", "/clickhouse/t2/replicas/2")
node2.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
try:
node2.query("ATTACH TABLE ordinary.t2")
except Exception as attach_ex:
print("Got exception node2", smaller_exception(attach_ex))
print("Got exception node2", smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node2"
for i in range(100):
try:
restart_replica_for_sure(node3, "ordinary.t2", "/clickhouse/t2/replicas/3")
node3.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
node3.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
try:
node3.query("ATTACH TABLE ordinary.t2")
except Exception as attach_ex:
print("Got exception node3", smaller_exception(attach_ex))
print("Got exception node3", smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node3"
node2.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "210")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "210")
# Total network partition
pm.partition_instances(node3, node2)
for i in range(10):
try:
node3.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
assert False, "Node3 became leader?"
except Exception as ex:
time.sleep(0.5)
for i in range(10):
try:
node2.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
assert False, "Node2 became leader?"
except Exception as ex:
time.sleep(0.5)
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
restart_replica_for_sure(node, "ordinary.t2", "/clickhouse/t2/replicas/{}".format(n + 1))
break
except Exception as ex:
try:
node.query("ATTACH TABLE ordinary.t2")
except Exception as attach_ex:
print("Got exception node{}".format(n + 1), smaller_exception(attach_ex))
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node{}".format(n + 1)
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
node.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node{}".format(n + 1)
for i in range(100):
all_done = True
for n, node in enumerate([node1, node2, node3]):
try:
restart_replica_for_sure(node, "ordinary.t2", "/clickhouse/t2/replicas/{}".format(n + 1))
node.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
break
except Exception as ex:
all_done = False
try:
node.query("ATTACH TABLE ordinary.t2")
except Exception as attach_ex:
print("Got exception node{}".format(n + 1), smaller_exception(attach_ex))
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
if all_done:
break
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect in i {} retries".format(i)
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t2", "510")
if node2.query("SELECT COUNT() FROM ordinary.t2") != "510\n":
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "510")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "510")