mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
101 lines
2.7 KiB
Python
101 lines
2.7 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import pytest
|
||
|
import time
|
||
|
import threading
|
||
|
from helpers.cluster import ClickHouseCluster
|
||
|
from multiprocessing.dummy import Pool
|
||
|
from helpers.network import PartitionManager
|
||
|
from helpers.client import QueryRuntimeException
|
||
|
from helpers.test_tools import assert_eq_with_retry
|
||
|
|
||
|
cluster = ClickHouseCluster(__file__)
|
||
|
|
||
|
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
||
|
|
||
|
|
||
|
@pytest.fixture(scope="module")
|
||
|
def started_cluster():
|
||
|
global cluster
|
||
|
try:
|
||
|
cluster.start()
|
||
|
yield cluster
|
||
|
|
||
|
finally:
|
||
|
cluster.shutdown()
|
||
|
|
||
|
|
||
|
def test_replica_inserts_with_keeper_restart(started_cluster):
|
||
|
try:
|
||
|
node1.query(
|
||
|
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
|
||
|
)
|
||
|
|
||
|
p = Pool(1)
|
||
|
zk_stopped_event = threading.Event()
|
||
|
|
||
|
def zoo_restart(zk_stopped_event):
|
||
|
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||
|
zk_stopped_event.set()
|
||
|
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||
|
|
||
|
job = p.apply_async(zoo_restart, (zk_stopped_event,))
|
||
|
|
||
|
zk_stopped_event.wait(90)
|
||
|
|
||
|
node1.query(
|
||
|
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
|
||
|
)
|
||
|
node1.query(
|
||
|
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
|
||
|
)
|
||
|
|
||
|
job.wait()
|
||
|
p.close()
|
||
|
p.join()
|
||
|
|
||
|
assert node1.query("SELECT COUNT() FROM r") == "20\n"
|
||
|
|
||
|
finally:
|
||
|
node1.query("DROP TABLE IF EXISTS r SYNC")
|
||
|
|
||
|
|
||
|
def test_replica_inserts_with_keeper_disconnect(started_cluster):
|
||
|
try:
|
||
|
node1.query(
|
||
|
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '0') ORDER BY tuple()"
|
||
|
)
|
||
|
|
||
|
p = Pool(1)
|
||
|
disconnect_event = threading.Event()
|
||
|
|
||
|
def keeper_disconnect(node, event):
|
||
|
with PartitionManager() as pm:
|
||
|
pm.drop_instance_zk_connections(node)
|
||
|
event.set()
|
||
|
|
||
|
job = p.apply_async(
|
||
|
keeper_disconnect,
|
||
|
(
|
||
|
node1,
|
||
|
disconnect_event,
|
||
|
),
|
||
|
)
|
||
|
disconnect_event.wait(90)
|
||
|
|
||
|
node1.query(
|
||
|
"INSERT INTO r SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=20"
|
||
|
)
|
||
|
node1.query(
|
||
|
"INSERT INTO r SELECT number, toString(number) FROM numbers(10, 10) SETTINGS insert_keeper_max_retries=20"
|
||
|
)
|
||
|
|
||
|
job.wait()
|
||
|
p.close()
|
||
|
p.join()
|
||
|
|
||
|
assert node1.query("SELECT COUNT() FROM r") == "20\n"
|
||
|
|
||
|
finally:
|
||
|
node1.query("DROP TABLE IF EXISTS r SYNC")
|