mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
More reliable test keeper tests
This commit is contained in:
parent
acf843a01a
commit
9396bae2e2
@ -31,7 +31,7 @@ struct ChangelogDirTest
|
||||
{
|
||||
std::string path;
|
||||
bool drop;
|
||||
ChangelogDirTest(std::string path_, bool drop_ = true)
|
||||
explicit ChangelogDirTest(std::string path_, bool drop_ = true)
|
||||
: path(path_)
|
||||
, drop(drop_)
|
||||
{
|
||||
|
@ -8,32 +8,23 @@ from multiprocessing.dummy import Pool
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], with_zookeeper=True)
|
||||
from kazoo.client import KazooClient, KazooState
|
||||
|
||||
_genuine_zk_instance = None
|
||||
_fake_zk_instance = None
|
||||
from kazoo.client import KazooClient, KazooState, KeeperState
|
||||
|
||||
def get_genuine_zk():
|
||||
global _genuine_zk_instance
|
||||
if not _genuine_zk_instance:
|
||||
print("Zoo1", cluster.get_instance_ip("zoo1"))
|
||||
_genuine_zk_instance = cluster.get_kazoo_client('zoo1')
|
||||
return _genuine_zk_instance
|
||||
|
||||
print("Zoo1", cluster.get_instance_ip("zoo1"))
|
||||
return cluster.get_kazoo_client('zoo1')
|
||||
|
||||
def get_fake_zk():
|
||||
global _fake_zk_instance
|
||||
if not _fake_zk_instance:
|
||||
print("node", cluster.get_instance_ip("node"))
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip("node") + ":9181", timeout=30.0)
|
||||
def reset_last_zxid_listener(state):
|
||||
print("Fake zk callback called for state", state)
|
||||
global _fake_zk_instance
|
||||
if state != KazooState.CONNECTED:
|
||||
_fake_zk_instance._reset()
|
||||
print("node", cluster.get_instance_ip("node"))
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip("node") + ":9181", timeout=30.0)
|
||||
def reset_last_zxid_listener(state):
|
||||
print("Fake zk callback called for state", state)
|
||||
nonlocal _fake_zk_instance
|
||||
if state != KazooState.CONNECTED:
|
||||
_fake_zk_instance._reset()
|
||||
|
||||
_fake_zk_instance.add_listener(reset_last_zxid_listener)
|
||||
_fake_zk_instance.start()
|
||||
_fake_zk_instance.add_listener(reset_last_zxid_listener)
|
||||
_fake_zk_instance.start()
|
||||
return _fake_zk_instance
|
||||
|
||||
def random_string(length):
|
||||
@ -44,6 +35,15 @@ def create_random_path(prefix="", depth=1):
|
||||
return prefix
|
||||
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
|
||||
|
||||
def stop_zk(zk):
|
||||
try:
|
||||
if zk:
|
||||
zk.stop()
|
||||
zk.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
@ -53,44 +53,46 @@ def started_cluster():
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
if _genuine_zk_instance:
|
||||
_genuine_zk_instance.stop()
|
||||
_genuine_zk_instance.close()
|
||||
if _fake_zk_instance:
|
||||
_fake_zk_instance.stop()
|
||||
_fake_zk_instance.close()
|
||||
|
||||
|
||||
def test_simple_commands(started_cluster):
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
try:
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
zk.create("/test_simple_commands", b"")
|
||||
zk.create("/test_simple_commands/somenode1", b"hello")
|
||||
zk.set("/test_simple_commands/somenode1", b"world")
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
zk.create("/test_simple_commands", b"")
|
||||
zk.create("/test_simple_commands/somenode1", b"hello")
|
||||
zk.set("/test_simple_commands/somenode1", b"world")
|
||||
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
assert zk.exists("/test_simple_commands")
|
||||
assert zk.exists("/test_simple_commands/somenode1")
|
||||
print(zk.get("/test_simple_commands/somenode1"))
|
||||
assert zk.get("/test_simple_commands/somenode1")[0] == b"world"
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
assert zk.exists("/test_simple_commands")
|
||||
assert zk.exists("/test_simple_commands/somenode1")
|
||||
print(zk.get("/test_simple_commands/somenode1"))
|
||||
assert zk.get("/test_simple_commands/somenode1")[0] == b"world"
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
|
||||
def test_sequential_nodes(started_cluster):
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_sequential_nodes")
|
||||
fake_zk.create("/test_sequential_nodes")
|
||||
for i in range(1, 11):
|
||||
genuine_zk.create("/test_sequential_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
genuine_zk.create("/test_sequential_nodes/" + ("b" * i))
|
||||
fake_zk.create("/test_sequential_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
fake_zk.create("/test_sequential_nodes/" + ("b" * i))
|
||||
try:
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_sequential_nodes")
|
||||
fake_zk.create("/test_sequential_nodes")
|
||||
for i in range(1, 11):
|
||||
genuine_zk.create("/test_sequential_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
genuine_zk.create("/test_sequential_nodes/" + ("b" * i))
|
||||
fake_zk.create("/test_sequential_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
fake_zk.create("/test_sequential_nodes/" + ("b" * i))
|
||||
|
||||
genuine_childs = list(sorted(genuine_zk.get_children("/test_sequential_nodes")))
|
||||
fake_childs = list(sorted(fake_zk.get_children("/test_sequential_nodes")))
|
||||
assert genuine_childs == fake_childs
|
||||
genuine_childs = list(sorted(genuine_zk.get_children("/test_sequential_nodes")))
|
||||
fake_childs = list(sorted(fake_zk.get_children("/test_sequential_nodes")))
|
||||
assert genuine_childs == fake_childs
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
|
||||
def assert_eq_stats(stat1, stat2):
|
||||
@ -102,130 +104,141 @@ def assert_eq_stats(stat1, stat2):
|
||||
assert stat1.numChildren == stat2.numChildren
|
||||
|
||||
def test_stats(started_cluster):
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_stats_nodes")
|
||||
fake_zk.create("/test_stats_nodes")
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(1, 11):
|
||||
genuine_zk.create("/test_stats_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
genuine_zk.create("/test_stats_nodes/" + ("b" * i))
|
||||
fake_zk.create("/test_stats_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
fake_zk.create("/test_stats_nodes/" + ("b" * i))
|
||||
try:
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_stats_nodes")
|
||||
fake_zk.create("/test_stats_nodes")
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(1, 11):
|
||||
genuine_zk.create("/test_stats_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
genuine_zk.create("/test_stats_nodes/" + ("b" * i))
|
||||
fake_zk.create("/test_stats_nodes/" + ("a" * i) + "-", sequence=True)
|
||||
fake_zk.create("/test_stats_nodes/" + ("b" * i))
|
||||
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(1, 11):
|
||||
print("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
genuine_zk.delete("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
genuine_zk.delete("/test_stats_nodes/" + ("b" * i))
|
||||
fake_zk.delete("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
fake_zk.delete("/test_stats_nodes/" + ("b" * i))
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(1, 11):
|
||||
print("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
genuine_zk.delete("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
genuine_zk.delete("/test_stats_nodes/" + ("b" * i))
|
||||
fake_zk.delete("/test_stats_nodes/" + ("a" * i) + "-" + "{:010d}".format((i - 1) * 2))
|
||||
fake_zk.delete("/test_stats_nodes/" + ("b" * i))
|
||||
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
print(genuine_stats)
|
||||
print(fake_stats)
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(100):
|
||||
genuine_zk.set("/test_stats_nodes", ("q" * i).encode())
|
||||
fake_zk.set("/test_stats_nodes", ("q" * i).encode())
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
print(genuine_stats)
|
||||
print(fake_stats)
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
for i in range(100):
|
||||
genuine_zk.set("/test_stats_nodes", ("q" * i).encode())
|
||||
fake_zk.set("/test_stats_nodes", ("q" * i).encode())
|
||||
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
print(genuine_stats)
|
||||
print(fake_stats)
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
genuine_stats = genuine_zk.exists("/test_stats_nodes")
|
||||
fake_stats = fake_zk.exists("/test_stats_nodes")
|
||||
print(genuine_stats)
|
||||
print(fake_stats)
|
||||
assert_eq_stats(genuine_stats, fake_stats)
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
def test_watchers(started_cluster):
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_data_watches")
|
||||
fake_zk.create("/test_data_watches")
|
||||
genuine_data_watch_data = None
|
||||
try:
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_data_watches")
|
||||
fake_zk.create("/test_data_watches")
|
||||
genuine_data_watch_data = None
|
||||
|
||||
def genuine_callback(event):
|
||||
print("Genuine data watch called")
|
||||
nonlocal genuine_data_watch_data
|
||||
genuine_data_watch_data = event
|
||||
def genuine_callback(event):
|
||||
print("Genuine data watch called")
|
||||
nonlocal genuine_data_watch_data
|
||||
genuine_data_watch_data = event
|
||||
|
||||
fake_data_watch_data = None
|
||||
def fake_callback(event):
|
||||
print("Fake data watch called")
|
||||
nonlocal fake_data_watch_data
|
||||
fake_data_watch_data = event
|
||||
fake_data_watch_data = None
|
||||
def fake_callback(event):
|
||||
print("Fake data watch called")
|
||||
nonlocal fake_data_watch_data
|
||||
fake_data_watch_data = event
|
||||
|
||||
genuine_zk.get("/test_data_watches", watch=genuine_callback)
|
||||
fake_zk.get("/test_data_watches", watch=fake_callback)
|
||||
genuine_zk.get("/test_data_watches", watch=genuine_callback)
|
||||
fake_zk.get("/test_data_watches", watch=fake_callback)
|
||||
|
||||
print("Calling set genuine")
|
||||
genuine_zk.set("/test_data_watches", b"a")
|
||||
print("Calling set fake")
|
||||
fake_zk.set("/test_data_watches", b"a")
|
||||
time.sleep(3)
|
||||
print("Calling set genuine")
|
||||
genuine_zk.set("/test_data_watches", b"a")
|
||||
print("Calling set fake")
|
||||
fake_zk.set("/test_data_watches", b"a")
|
||||
time.sleep(3)
|
||||
|
||||
print("Genuine data", genuine_data_watch_data)
|
||||
print("Fake data", fake_data_watch_data)
|
||||
assert genuine_data_watch_data == fake_data_watch_data
|
||||
print("Genuine data", genuine_data_watch_data)
|
||||
print("Fake data", fake_data_watch_data)
|
||||
assert genuine_data_watch_data == fake_data_watch_data
|
||||
|
||||
genuine_children = None
|
||||
def genuine_child_callback(event):
|
||||
print("Genuine child watch called")
|
||||
nonlocal genuine_children
|
||||
genuine_children = event
|
||||
genuine_children = None
|
||||
def genuine_child_callback(event):
|
||||
print("Genuine child watch called")
|
||||
nonlocal genuine_children
|
||||
genuine_children = event
|
||||
|
||||
fake_children = None
|
||||
def fake_child_callback(event):
|
||||
print("Fake child watch called")
|
||||
nonlocal fake_children
|
||||
fake_children = event
|
||||
fake_children = None
|
||||
def fake_child_callback(event):
|
||||
print("Fake child watch called")
|
||||
nonlocal fake_children
|
||||
fake_children = event
|
||||
|
||||
genuine_zk.get_children("/test_data_watches", watch=genuine_child_callback)
|
||||
fake_zk.get_children("/test_data_watches", watch=fake_child_callback)
|
||||
genuine_zk.get_children("/test_data_watches", watch=genuine_child_callback)
|
||||
fake_zk.get_children("/test_data_watches", watch=fake_child_callback)
|
||||
|
||||
print("Calling genuine child")
|
||||
genuine_zk.create("/test_data_watches/child", b"b")
|
||||
print("Calling fake child")
|
||||
fake_zk.create("/test_data_watches/child", b"b")
|
||||
print("Calling genuine child")
|
||||
genuine_zk.create("/test_data_watches/child", b"b")
|
||||
print("Calling fake child")
|
||||
fake_zk.create("/test_data_watches/child", b"b")
|
||||
|
||||
time.sleep(3)
|
||||
time.sleep(3)
|
||||
|
||||
print("Genuine children", genuine_children)
|
||||
print("Fake children", fake_children)
|
||||
assert genuine_children == fake_children
|
||||
print("Genuine children", genuine_children)
|
||||
print("Fake children", fake_children)
|
||||
assert genuine_children == fake_children
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
def test_multitransactions(started_cluster):
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
zk.create('/test_multitransactions')
|
||||
t = zk.transaction()
|
||||
t.create('/test_multitransactions/freddy')
|
||||
t.create('/test_multitransactions/fred', ephemeral=True)
|
||||
t.create('/test_multitransactions/smith', sequence=True)
|
||||
results = t.commit()
|
||||
assert len(results) == 3
|
||||
assert results[0] == '/test_multitransactions/freddy'
|
||||
assert results[2].startswith('/test_multitransactions/smith0') is True
|
||||
|
||||
from kazoo.exceptions import RolledBackError, NoNodeError
|
||||
for i, zk in enumerate([genuine_zk, fake_zk]):
|
||||
print("Processing ZK", i)
|
||||
t = zk.transaction()
|
||||
t.create('/test_multitransactions/q')
|
||||
t.delete('/test_multitransactions/a')
|
||||
t.create('/test_multitransactions/x')
|
||||
results = t.commit()
|
||||
print("Results", results)
|
||||
assert results[0].__class__ == RolledBackError
|
||||
assert results[1].__class__ == NoNodeError
|
||||
assert zk.exists('/test_multitransactions/q') is None
|
||||
assert zk.exists('/test_multitransactions/a') is None
|
||||
assert zk.exists('/test_multitransactions/x') is None
|
||||
try:
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
zk.create('/test_multitransactions')
|
||||
t = zk.transaction()
|
||||
t.create('/test_multitransactions/freddy')
|
||||
t.create('/test_multitransactions/fred', ephemeral=True)
|
||||
t.create('/test_multitransactions/smith', sequence=True)
|
||||
results = t.commit()
|
||||
assert len(results) == 3
|
||||
assert results[0] == '/test_multitransactions/freddy'
|
||||
assert results[2].startswith('/test_multitransactions/smith0') is True
|
||||
|
||||
from kazoo.exceptions import RolledBackError, NoNodeError
|
||||
for i, zk in enumerate([genuine_zk, fake_zk]):
|
||||
print("Processing ZK", i)
|
||||
t = zk.transaction()
|
||||
t.create('/test_multitransactions/q')
|
||||
t.delete('/test_multitransactions/a')
|
||||
t.create('/test_multitransactions/x')
|
||||
results = t.commit()
|
||||
print("Results", results)
|
||||
assert results[0].__class__ == RolledBackError
|
||||
assert results[1].__class__ == NoNodeError
|
||||
assert zk.exists('/test_multitransactions/q') is None
|
||||
assert zk.exists('/test_multitransactions/a') is None
|
||||
assert zk.exists('/test_multitransactions/x') is None
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
def exists(zk, path):
|
||||
result = zk.exists(path)
|
||||
@ -278,13 +291,13 @@ class Request(object):
|
||||
arg_str = ', '.join([str(k) + "=" + str(v) for k, v in self.arguments.items()])
|
||||
return "ZKRequest name {} with arguments {}".format(self.name, arg_str)
|
||||
|
||||
def generate_requests(iters=1):
|
||||
def generate_requests(prefix="/", iters=1):
|
||||
requests = []
|
||||
existing_paths = []
|
||||
for i in range(iters):
|
||||
for _ in range(100):
|
||||
rand_length = random.randint(0, 10)
|
||||
path = "/"
|
||||
path = prefix
|
||||
for j in range(1, rand_length):
|
||||
path = create_random_path(path, 1)
|
||||
existing_paths.append(path)
|
||||
@ -322,31 +335,43 @@ def generate_requests(iters=1):
|
||||
|
||||
|
||||
def test_random_requests(started_cluster):
|
||||
requests = generate_requests(10)
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
for i, request in enumerate(requests):
|
||||
genuine_throw = False
|
||||
fake_throw = False
|
||||
fake_result = None
|
||||
genuine_result = None
|
||||
try:
|
||||
genuine_result = request.callback(genuine_zk)
|
||||
except Exception as ex:
|
||||
genuine_throw = True
|
||||
try:
|
||||
requests = generate_requests("/test_random_requests", 10)
|
||||
print("Generated", len(requests), "requests")
|
||||
genuine_zk = get_genuine_zk()
|
||||
fake_zk = get_fake_zk()
|
||||
genuine_zk.create("/test_random_requests")
|
||||
fake_zk.create("/test_random_requests")
|
||||
for i, request in enumerate(requests):
|
||||
genuine_throw = False
|
||||
fake_throw = False
|
||||
fake_result = None
|
||||
genuine_result = None
|
||||
try:
|
||||
genuine_result = request.callback(genuine_zk)
|
||||
except Exception as ex:
|
||||
print("i", i, "request", request)
|
||||
print("Genuine exception", str(ex))
|
||||
genuine_throw = True
|
||||
|
||||
try:
|
||||
fake_result = request.callback(fake_zk)
|
||||
except Exception as ex:
|
||||
fake_throw = True
|
||||
try:
|
||||
fake_result = request.callback(fake_zk)
|
||||
except Exception as ex:
|
||||
print("i", i, "request", request)
|
||||
print("Fake exception", str(ex))
|
||||
fake_throw = True
|
||||
|
||||
assert fake_throw == genuine_throw, "Fake throw genuine not or vise versa"
|
||||
assert fake_result == genuine_result, "Zookeeper results differ"
|
||||
root_children_genuine = [elem for elem in list(sorted(genuine_zk.get_children("/"))) if elem not in ('clickhouse', 'zookeeper')]
|
||||
root_children_fake = [elem for elem in list(sorted(fake_zk.get_children("/"))) if elem not in ('clickhouse', 'zookeeper')]
|
||||
assert root_children_fake == root_children_genuine
|
||||
assert fake_throw == genuine_throw, "Fake throw genuine not or vise versa request {}"
|
||||
assert fake_result == genuine_result, "Zookeeper results differ"
|
||||
root_children_genuine = [elem for elem in list(sorted(genuine_zk.get_children("/test_random_requests"))) if elem not in ('clickhouse', 'zookeeper')]
|
||||
root_children_fake = [elem for elem in list(sorted(fake_zk.get_children("/test_random_requests"))) if elem not in ('clickhouse', 'zookeeper')]
|
||||
assert root_children_fake == root_children_genuine
|
||||
finally:
|
||||
for zk in [genuine_zk, fake_zk]:
|
||||
stop_zk(zk)
|
||||
|
||||
def test_end_of_session(started_cluster):
|
||||
|
||||
fake_zk1 = None
|
||||
fake_zk2 = None
|
||||
genuine_zk1 = None
|
||||
@ -401,13 +426,8 @@ def test_end_of_session(started_cluster):
|
||||
assert fake_ephemeral_event == genuine_ephemeral_event
|
||||
|
||||
finally:
|
||||
try:
|
||||
for zk in [fake_zk1, fake_zk2, genuine_zk1, genuine_zk2]:
|
||||
if zk:
|
||||
zk.stop()
|
||||
zk.close()
|
||||
except:
|
||||
pass
|
||||
for zk in [fake_zk1, fake_zk2, genuine_zk1, genuine_zk2]:
|
||||
stop_zk(zk)
|
||||
|
||||
def test_end_of_watches_session(started_cluster):
|
||||
fake_zk1 = None
|
||||
@ -442,91 +462,89 @@ def test_end_of_watches_session(started_cluster):
|
||||
|
||||
assert dummy_set == 2
|
||||
finally:
|
||||
try:
|
||||
for zk in [fake_zk1, fake_zk2]:
|
||||
if zk:
|
||||
zk.stop()
|
||||
zk.close()
|
||||
except:
|
||||
pass
|
||||
for zk in [fake_zk1, fake_zk2]:
|
||||
stop_zk(zk)
|
||||
|
||||
def test_concurrent_watches(started_cluster):
|
||||
fake_zk = get_fake_zk()
|
||||
fake_zk.restart()
|
||||
global_path = "/test_concurrent_watches_0"
|
||||
fake_zk.create(global_path)
|
||||
try:
|
||||
fake_zk = get_fake_zk()
|
||||
fake_zk.restart()
|
||||
global_path = "/test_concurrent_watches_0"
|
||||
fake_zk.create(global_path)
|
||||
|
||||
dumb_watch_triggered_counter = 0
|
||||
all_paths_triggered = []
|
||||
dumb_watch_triggered_counter = 0
|
||||
all_paths_triggered = []
|
||||
|
||||
existing_path = []
|
||||
all_paths_created = []
|
||||
watches_created = 0
|
||||
def create_path_and_watch(i):
|
||||
nonlocal watches_created
|
||||
nonlocal all_paths_created
|
||||
fake_zk.ensure_path(global_path + "/" + str(i))
|
||||
# new function each time
|
||||
def dumb_watch(event):
|
||||
nonlocal dumb_watch_triggered_counter
|
||||
dumb_watch_triggered_counter += 1
|
||||
nonlocal all_paths_triggered
|
||||
all_paths_triggered.append(event.path)
|
||||
existing_path = []
|
||||
all_paths_created = []
|
||||
watches_created = 0
|
||||
def create_path_and_watch(i):
|
||||
nonlocal watches_created
|
||||
nonlocal all_paths_created
|
||||
fake_zk.ensure_path(global_path + "/" + str(i))
|
||||
# new function each time
|
||||
def dumb_watch(event):
|
||||
nonlocal dumb_watch_triggered_counter
|
||||
dumb_watch_triggered_counter += 1
|
||||
nonlocal all_paths_triggered
|
||||
all_paths_triggered.append(event.path)
|
||||
|
||||
fake_zk.get(global_path + "/" + str(i), watch=dumb_watch)
|
||||
all_paths_created.append(global_path + "/" + str(i))
|
||||
watches_created += 1
|
||||
existing_path.append(i)
|
||||
fake_zk.get(global_path + "/" + str(i), watch=dumb_watch)
|
||||
all_paths_created.append(global_path + "/" + str(i))
|
||||
watches_created += 1
|
||||
existing_path.append(i)
|
||||
|
||||
trigger_called = 0
|
||||
def trigger_watch(i):
|
||||
nonlocal trigger_called
|
||||
trigger_called += 1
|
||||
fake_zk.set(global_path + "/" + str(i), b"somevalue")
|
||||
try:
|
||||
existing_path.remove(i)
|
||||
except:
|
||||
pass
|
||||
|
||||
def call(total):
|
||||
for i in range(total):
|
||||
create_path_and_watch(random.randint(0, 1000))
|
||||
time.sleep(random.random() % 0.5)
|
||||
trigger_called = 0
|
||||
def trigger_watch(i):
|
||||
nonlocal trigger_called
|
||||
trigger_called += 1
|
||||
fake_zk.set(global_path + "/" + str(i), b"somevalue")
|
||||
try:
|
||||
rand_num = random.choice(existing_path)
|
||||
trigger_watch(rand_num)
|
||||
except:
|
||||
pass
|
||||
while existing_path:
|
||||
try:
|
||||
rand_num = random.choice(existing_path)
|
||||
trigger_watch(rand_num)
|
||||
existing_path.remove(i)
|
||||
except:
|
||||
pass
|
||||
|
||||
p = Pool(10)
|
||||
arguments = [100] * 10
|
||||
watches_must_be_created = sum(arguments)
|
||||
watches_trigger_must_be_called = sum(arguments)
|
||||
watches_must_be_triggered = sum(arguments)
|
||||
p.map(call, arguments)
|
||||
p.close()
|
||||
def call(total):
|
||||
for i in range(total):
|
||||
create_path_and_watch(random.randint(0, 1000))
|
||||
time.sleep(random.random() % 0.5)
|
||||
try:
|
||||
rand_num = random.choice(existing_path)
|
||||
trigger_watch(rand_num)
|
||||
except:
|
||||
pass
|
||||
while existing_path:
|
||||
try:
|
||||
rand_num = random.choice(existing_path)
|
||||
trigger_watch(rand_num)
|
||||
except:
|
||||
pass
|
||||
|
||||
# waiting for late watches
|
||||
for i in range(50):
|
||||
if dumb_watch_triggered_counter == watches_must_be_triggered:
|
||||
break
|
||||
p = Pool(10)
|
||||
arguments = [100] * 10
|
||||
watches_must_be_created = sum(arguments)
|
||||
watches_trigger_must_be_called = sum(arguments)
|
||||
watches_must_be_triggered = sum(arguments)
|
||||
p.map(call, arguments)
|
||||
p.close()
|
||||
|
||||
time.sleep(0.1)
|
||||
# waiting for late watches
|
||||
for i in range(50):
|
||||
if dumb_watch_triggered_counter == watches_must_be_triggered:
|
||||
break
|
||||
|
||||
assert watches_created == watches_must_be_created
|
||||
assert trigger_called >= watches_trigger_must_be_called
|
||||
assert len(existing_path) == 0
|
||||
if dumb_watch_triggered_counter != watches_must_be_triggered:
|
||||
print("All created paths", all_paths_created)
|
||||
print("All triggerred paths", all_paths_triggered)
|
||||
print("All paths len", len(all_paths_created))
|
||||
print("All triggered len", len(all_paths_triggered))
|
||||
print("Diff", list(set(all_paths_created) - set(all_paths_triggered)))
|
||||
time.sleep(0.1)
|
||||
|
||||
assert dumb_watch_triggered_counter == watches_must_be_triggered
|
||||
assert watches_created == watches_must_be_created
|
||||
assert trigger_called >= watches_trigger_must_be_called
|
||||
assert len(existing_path) == 0
|
||||
if dumb_watch_triggered_counter != watches_must_be_triggered:
|
||||
print("All created paths", all_paths_created)
|
||||
print("All triggerred paths", all_paths_triggered)
|
||||
print("All paths len", len(all_paths_created))
|
||||
print("All triggered len", len(all_paths_triggered))
|
||||
print("Diff", list(set(all_paths_created) - set(all_paths_triggered)))
|
||||
|
||||
assert dumb_watch_triggered_counter == watches_must_be_triggered
|
||||
finally:
|
||||
stop_zk(fake_zk)
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,21 @@
|
||||
<yandex>
|
||||
<test_keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>1</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||
<session_timeout_ms>10000</session_timeout_ms>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>localhost</hostname>
|
||||
<port>44444</port>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</test_keeper_server>
|
||||
</yandex>
|
@ -0,0 +1,12 @@
|
||||
<yandex>
|
||||
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
</yandex>
|
@ -0,0 +1,8 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>node1</host>
|
||||
<port>9181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
</yandex>
|
124
tests/integration/test_testkeeper_persistent_log/test.py
Normal file
124
tests/integration/test_testkeeper_persistent_log/test.py
Normal file
@ -0,0 +1,124 @@
|
||||
#!/usr/bin/env python3
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import random
|
||||
import string
|
||||
import os
|
||||
import time
|
||||
from kazoo.client import KazooClient, KazooState
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
|
||||
|
||||
|
||||
def random_string(length):
|
||||
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
|
||||
|
||||
def create_random_path(prefix="", depth=1):
|
||||
if depth == 0:
|
||||
return prefix
|
||||
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
def get_connection_zk(nodename, timeout=30.0):
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
|
||||
def reset_listener(state):
|
||||
nonlocal _fake_zk_instance
|
||||
print("Fake zk callback called for state", state)
|
||||
if state != KazooState.CONNECTED:
|
||||
_fake_zk_instance._reset()
|
||||
|
||||
_fake_zk_instance.add_listener(reset_listener)
|
||||
_fake_zk_instance.start()
|
||||
return _fake_zk_instance
|
||||
|
||||
def test_state_after_restart(started_cluster):
|
||||
try:
|
||||
node_zk = None
|
||||
node_zk2 = None
|
||||
node_zk = get_connection_zk("node")
|
||||
|
||||
node_zk.create("/test_state_after_restart", b"somevalue")
|
||||
strs = []
|
||||
for i in range(100):
|
||||
strs.append(random_string(123).encode())
|
||||
node_zk.create("/test_state_after_restart/node" + str(i), strs[i])
|
||||
|
||||
for i in range(100):
|
||||
if i % 7 == 0:
|
||||
node_zk.delete("/test_state_after_restart/node" + str(i))
|
||||
|
||||
node.restart_clickhouse(kill=True)
|
||||
|
||||
node_zk2 = get_connection_zk("node")
|
||||
|
||||
assert node_zk2.get("/test_state_after_restart")[0] == b"somevalue"
|
||||
for i in range(100):
|
||||
if i % 7 == 0:
|
||||
assert node_zk2.exists("/test_state_after_restart/node" + str(i)) is None
|
||||
else:
|
||||
assert len(node_zk2.get("/test_state_after_restart/node" + str(i))[0]) == 123
|
||||
assert node_zk2.get("/test_state_after_restart/node" + str(i))[0] == strs[i]
|
||||
finally:
|
||||
try:
|
||||
if node_zk is not None:
|
||||
node_zk.stop()
|
||||
node_zk.close()
|
||||
|
||||
if node_zk2 is not None:
|
||||
node_zk2.stop()
|
||||
node_zk2.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
# http://zookeeper-user.578899.n2.nabble.com/Why-are-ephemeral-nodes-written-to-disk-tp7583403p7583418.html
|
||||
def test_ephemeral_after_restart(started_cluster):
|
||||
try:
|
||||
node_zk = None
|
||||
node_zk2 = None
|
||||
node_zk = get_connection_zk("node")
|
||||
|
||||
node_zk.create("/test_ephemeral_after_restart", b"somevalue")
|
||||
strs = []
|
||||
for i in range(100):
|
||||
strs.append(random_string(123).encode())
|
||||
node_zk.create("/test_ephemeral_after_restart/node" + str(i), strs[i], ephemeral=True)
|
||||
|
||||
for i in range(100):
|
||||
if i % 7 == 0:
|
||||
node_zk.delete("/test_ephemeral_after_restart/node" + str(i))
|
||||
|
||||
node.restart_clickhouse(kill=True)
|
||||
|
||||
node_zk2 = get_connection_zk("node")
|
||||
|
||||
assert node_zk2.get("/test_ephemeral_after_restart")[0] == b"somevalue"
|
||||
for i in range(100):
|
||||
if i % 7 == 0:
|
||||
assert node_zk2.exists("/test_ephemeral_after_restart/node" + str(i)) is None
|
||||
else:
|
||||
assert len(node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0]) == 123
|
||||
assert node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0] == strs[i]
|
||||
finally:
|
||||
try:
|
||||
if node_zk is not None:
|
||||
node_zk.stop()
|
||||
node_zk.close()
|
||||
|
||||
if node_zk2 is not None:
|
||||
node_zk2.stop()
|
||||
node_zk2.close()
|
||||
except:
|
||||
pass
|
Loading…
Reference in New Issue
Block a user