ClickHouse/tests/integration/test_keeper_map/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

189 lines
5.6 KiB
Python
Raw Normal View History

2022-08-30 13:41:36 +00:00
import multiprocessing
import pytest
from time import sleep
import random
from itertools import count
from sys import stdout
2022-11-25 11:12:01 +00:00
from multiprocessing.dummy import Pool
2022-08-30 13:41:36 +00:00
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain
from helpers.network import PartitionManager
test_recover_staled_replica_run = 1
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
2022-09-02 07:49:09 +00:00
main_configs=["configs/enable_keeper_map.xml"],
2022-08-30 13:41:36 +00:00
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_genuine_zk():
return cluster.get_kazoo_client("zoo1")
def remove_children(client, path):
children = client.get_children(path)
for child in children:
child_path = f"{path}/{child}"
remove_children(client, child_path)
client.delete(child_path)
def test_create_keeper_map(started_cluster):
node.query(
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
zk_client = get_genuine_zk()
2022-09-12 14:43:52 +00:00
def assert_children_size(path, expected_size):
2023-01-10 09:15:07 +00:00
children_size = 0
# 4 secs should be more than enough for replica to sync
for _ in range(10):
children_size = len(zk_client.get_children(path))
if children_size == expected_size:
return
sleep(0.4)
assert (
False
), f"Invalid number of children for '{path}': actual {children_size}, expected {expected_size}"
2022-08-30 13:41:36 +00:00
2022-09-12 14:43:52 +00:00
def assert_root_children_size(expected_size):
assert_children_size("/test_keeper_map/test1", expected_size)
def assert_data_children_size(expected_size):
assert_children_size("/test_keeper_map/test1/data", expected_size)
assert_root_children_size(2)
assert_data_children_size(0)
2022-08-30 13:41:36 +00:00
node.query("INSERT INTO test_keeper_map VALUES (1, 11)")
2022-09-12 14:43:52 +00:00
assert_data_children_size(1)
2022-08-30 13:41:36 +00:00
node.query(
"CREATE TABLE test_keeper_map_another (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
2022-09-12 14:43:52 +00:00
assert_root_children_size(2)
assert_data_children_size(1)
2022-08-30 13:41:36 +00:00
node.query("INSERT INTO test_keeper_map_another VALUES (1, 11)")
2022-09-12 14:43:52 +00:00
assert_root_children_size(2)
assert_data_children_size(1)
2022-08-30 13:41:36 +00:00
node.query("INSERT INTO test_keeper_map_another VALUES (2, 22)")
2022-09-12 14:43:52 +00:00
assert_root_children_size(2)
assert_data_children_size(2)
2022-08-30 13:41:36 +00:00
node.query("DROP TABLE test_keeper_map SYNC")
2022-09-12 14:43:52 +00:00
assert_root_children_size(2)
assert_data_children_size(2)
2022-08-30 13:41:36 +00:00
node.query("DROP TABLE test_keeper_map_another SYNC")
2022-09-12 14:43:52 +00:00
assert_root_children_size(0)
2022-08-30 13:41:36 +00:00
zk_client.stop()
def create_drop_loop(index, stop_event):
table_name = f"test_keeper_map_{index}"
for i in count(0, 1):
if stop_event.is_set():
return
2023-01-12 11:28:41 +00:00
node.query_with_retry(
2022-08-30 13:41:36 +00:00
f"CREATE TABLE {table_name} (key UInt64, value UInt64) ENGINE = KeeperMap('/test') PRIMARY KEY(key);"
)
2023-01-12 11:28:41 +00:00
node.query_with_retry(f"INSERT INTO {table_name} VALUES ({index}, {i})")
result = node.query_with_retry(f"SELECT value FROM {table_name} WHERE key = {index}")
2022-08-30 13:41:36 +00:00
assert result.strip() == str(i)
2023-01-12 11:28:41 +00:00
node.query_with_retry(f"DROP TABLE {table_name} SYNC")
2022-08-30 13:41:36 +00:00
def test_create_drop_keeper_map_concurrent(started_cluster):
pool = Pool()
manager = multiprocessing.Manager()
stop_event = manager.Event()
results = []
for i in range(multiprocessing.cpu_count()):
sleep(0.2)
results.append(
pool.apply_async(
create_drop_loop,
args=(
i,
stop_event,
),
)
)
sleep(60)
stop_event.set()
for result in results:
result.get()
pool.close()
client = get_genuine_zk()
2022-09-02 07:34:26 +00:00
assert len(client.get_children("/test_keeper_map/test")) == 0
client.stop()
2022-08-30 13:41:36 +00:00
def test_keeper_map_without_zk(started_cluster):
def assert_keeper_exception_after_partition(query):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
error = node.query_and_get_error(query)
assert "Coordination::Exception" in error
assert_keeper_exception_after_partition(
2023-01-10 13:16:28 +00:00
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_without_zk') PRIMARY KEY(key);"
2022-08-30 13:41:36 +00:00
)
node.query(
2023-01-10 13:16:28 +00:00
"CREATE TABLE test_keeper_map_without_zk (key UInt64, value UInt64) ENGINE = KeeperMap('/test_without_zk') PRIMARY KEY(key);"
2022-08-30 13:41:36 +00:00
)
assert_keeper_exception_after_partition(
2023-01-10 13:16:28 +00:00
"INSERT INTO test_keeper_map_without_zk VALUES (1, 11)"
2022-08-30 13:41:36 +00:00
)
2023-01-10 13:16:28 +00:00
node.query("INSERT INTO test_keeper_map_without_zk VALUES (1, 11)")
2022-08-30 13:41:36 +00:00
2023-01-10 13:16:28 +00:00
assert_keeper_exception_after_partition("SELECT * FROM test_keeper_map_without_zk")
node.query("SELECT * FROM test_keeper_map_without_zk")
2022-08-30 13:41:36 +00:00
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
node.restart_clickhouse(60)
2023-01-10 13:16:28 +00:00
error = node.query_and_get_error("SELECT * FROM test_keeper_map_without_zk")
2022-08-30 13:41:36 +00:00
assert "Failed to activate table because of connection issues" in error
2023-01-10 13:16:28 +00:00
node.query("SELECT * FROM test_keeper_map_without_zk")
2022-08-30 13:41:36 +00:00
client = get_genuine_zk()
2023-01-10 13:16:28 +00:00
remove_children(client, "/test_keeper_map/test_without_zk")
2022-08-30 13:41:36 +00:00
node.restart_clickhouse(60)
2023-01-10 13:16:28 +00:00
error = node.query_and_get_error("SELECT * FROM test_keeper_map_without_zk")
2022-08-30 13:41:36 +00:00
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
2023-01-10 13:16:28 +00:00
node.query("DETACH TABLE test_keeper_map_without_zk")
2022-08-30 13:41:36 +00:00
client.stop()