Add context manager for partition manager

This commit is contained in:
divanik 2024-11-06 15:55:41 +00:00
parent 3538886a2b
commit 8bb656ddec

View File

@ -379,50 +379,55 @@ def test_insert_quorum_with_keeper_loss_connection(started_cluster):
)
)
pm = PartitionManager()
pm.drop_instance_zk_connections(zero)
with PartitionManager() as pm:
pm.drop_instance_zk_connections(zero)
retries = 0
zk = cluster.get_kazoo_client("zoo1")
while True:
if (
zk.exists(f"/clickhouse/tables/{table_name}/replicas/zero/is_active")
is None
):
break
print("replica is still active")
time.sleep(1)
retries += 1
if retries == 120:
raise Exception("Can not wait cluster replica inactive")
retries = 0
zk = cluster.get_kazoo_client("zoo1")
while True:
if (
zk.exists(
f"/clickhouse/tables/{table_name}/replicas/zero/is_active"
)
is None
):
break
print("replica is still active")
time.sleep(1)
retries += 1
if retries == 120:
raise Exception("Can not wait cluster replica inactive")
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
quorum_fail_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
quorum_fail_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
)
)
)
first.query(f"SYSTEM START FETCHES {table_name}")
first.query(f"SYSTEM START FETCHES {table_name}")
concurrent.futures.wait([quorum_fail_future])
concurrent.futures.wait([quorum_fail_future])
assert quorum_fail_future.exception() is None
assert quorum_fail_future.exception() is None
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
clean_quorum_fail_parts_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts", timeout=300
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
clean_quorum_fail_parts_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts",
timeout=300,
)
)
)
pm.restore_instance_zk_connections(zero)
concurrent.futures.wait([clean_quorum_fail_parts_future])
pm.restore_instance_zk_connections(zero)
concurrent.futures.wait([clean_quorum_fail_parts_future])
assert clean_quorum_fail_parts_future.exception() is None
assert clean_quorum_fail_parts_future.exception() is None
zero.query("SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
concurrent.futures.wait([insert_future])
assert insert_future.exception() is not None
assert not zero.contains_in_log("LOGICAL_ERROR")
assert zero.contains_in_log(
"fails to commit and will not retry or clean garbage"
)
zero.query(
"SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause"
)
concurrent.futures.wait([insert_future])
assert insert_future.exception() is not None
assert not zero.contains_in_log("LOGICAL_ERROR")
assert zero.contains_in_log(
"fails to commit and will not retry or clean garbage"
)