ClickHouse/tests/integration/test_quorum_inserts_parallel/test.py
2023-07-04 19:32:43 +00:00

157 lines
4.8 KiB
Python

#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)
node3 = cluster.add_instance("node3", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_parallel_quorum_actually_parallel(started_cluster):
settings = {
"insert_quorum": "3",
"insert_quorum_parallel": "1",
"function_sleep_max_microseconds_per_block": "0",
}
for i, node in enumerate([node1, node2, node3]):
node.query(
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(
num=i
)
)
p = Pool(10)
def long_insert(node):
node.query(
"INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0",
settings=settings,
)
job = p.apply_async(long_insert, (node1,))
node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
assert node1.query("SELECT COUNT() FROM r") == "1\n"
assert node2.query("SELECT COUNT() FROM r") == "1\n"
assert node3.query("SELECT COUNT() FROM r") == "1\n"
node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
assert node1.query("SELECT COUNT() FROM r") == "2\n"
assert node2.query("SELECT COUNT() FROM r") == "2\n"
assert node3.query("SELECT COUNT() FROM r") == "2\n"
job.get()
assert node1.query("SELECT COUNT() FROM r") == "7\n"
assert node2.query("SELECT COUNT() FROM r") == "7\n"
assert node3.query("SELECT COUNT() FROM r") == "7\n"
p.close()
p.join()
def test_parallel_quorum_actually_quorum(started_cluster):
for i, node in enumerate([node1, node2, node3]):
node.query(
"CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(
num=i
)
)
with PartitionManager() as pm:
pm.partition_instances(node2, node1, port=9009)
pm.partition_instances(node2, node3, port=9009)
with pytest.raises(QueryRuntimeException):
node1.query(
"INSERT INTO q VALUES(1, 'Hello')",
settings={
"insert_quorum": "3",
"insert_quorum_parallel": "1",
"insert_quorum_timeout": "3000",
},
)
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")
node1.query(
"INSERT INTO q VALUES(2, 'wlrd')",
settings={
"insert_quorum": "2",
"insert_quorum_parallel": "1",
"insert_quorum_timeout": "3000",
},
)
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")
def insert_value_to_node(node, settings):
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
p = Pool(2)
res = p.apply_async(
insert_value_to_node,
(
node1,
{
"insert_quorum": "3",
"insert_quorum_parallel": "1",
"insert_quorum_timeout": "60000",
},
),
)
assert_eq_with_retry(
node1,
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
"3",
)
assert_eq_with_retry(
node3,
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
"3",
)
assert_eq_with_retry(
node2,
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
"0",
)
# Insert to the second to satisfy quorum
insert_value_to_node(
node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"}
)
res.get()
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
p.close()
p.join()
node2.query("SYSTEM SYNC REPLICA q", timeout=10)
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")