mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 19:42:00 +00:00
157 lines
4.8 KiB
Python
157 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from multiprocessing.dummy import Pool
|
|
from helpers.network import PartitionManager
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
|
node2 = cluster.add_instance("node2", with_zookeeper=True)
|
|
node3 = cluster.add_instance("node3", with_zookeeper=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
global cluster
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_parallel_quorum_actually_parallel(started_cluster):
|
|
settings = {
|
|
"insert_quorum": "3",
|
|
"insert_quorum_parallel": "1",
|
|
"function_sleep_max_microseconds_per_block": "0",
|
|
}
|
|
for i, node in enumerate([node1, node2, node3]):
|
|
node.query(
|
|
"CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(
|
|
num=i
|
|
)
|
|
)
|
|
|
|
p = Pool(10)
|
|
|
|
def long_insert(node):
|
|
node.query(
|
|
"INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0",
|
|
settings=settings,
|
|
)
|
|
|
|
job = p.apply_async(long_insert, (node1,))
|
|
|
|
node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
|
|
assert node1.query("SELECT COUNT() FROM r") == "1\n"
|
|
assert node2.query("SELECT COUNT() FROM r") == "1\n"
|
|
assert node3.query("SELECT COUNT() FROM r") == "1\n"
|
|
|
|
node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
|
|
assert node1.query("SELECT COUNT() FROM r") == "2\n"
|
|
assert node2.query("SELECT COUNT() FROM r") == "2\n"
|
|
assert node3.query("SELECT COUNT() FROM r") == "2\n"
|
|
|
|
job.get()
|
|
|
|
assert node1.query("SELECT COUNT() FROM r") == "7\n"
|
|
assert node2.query("SELECT COUNT() FROM r") == "7\n"
|
|
assert node3.query("SELECT COUNT() FROM r") == "7\n"
|
|
p.close()
|
|
p.join()
|
|
|
|
|
|
def test_parallel_quorum_actually_quorum(started_cluster):
|
|
for i, node in enumerate([node1, node2, node3]):
|
|
node.query(
|
|
"CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(
|
|
num=i
|
|
)
|
|
)
|
|
|
|
with PartitionManager() as pm:
|
|
pm.partition_instances(node2, node1, port=9009)
|
|
pm.partition_instances(node2, node3, port=9009)
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query(
|
|
"INSERT INTO q VALUES(1, 'Hello')",
|
|
settings={
|
|
"insert_quorum": "3",
|
|
"insert_quorum_parallel": "1",
|
|
"insert_quorum_timeout": "3000",
|
|
},
|
|
)
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")
|
|
|
|
node1.query(
|
|
"INSERT INTO q VALUES(2, 'wlrd')",
|
|
settings={
|
|
"insert_quorum": "2",
|
|
"insert_quorum_parallel": "1",
|
|
"insert_quorum_timeout": "3000",
|
|
},
|
|
)
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")
|
|
|
|
def insert_value_to_node(node, settings):
|
|
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
|
|
|
|
p = Pool(2)
|
|
res = p.apply_async(
|
|
insert_value_to_node,
|
|
(
|
|
node1,
|
|
{
|
|
"insert_quorum": "3",
|
|
"insert_quorum_parallel": "1",
|
|
"insert_quorum_timeout": "60000",
|
|
},
|
|
),
|
|
)
|
|
|
|
assert_eq_with_retry(
|
|
node1,
|
|
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
|
|
"3",
|
|
)
|
|
assert_eq_with_retry(
|
|
node3,
|
|
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
|
|
"3",
|
|
)
|
|
assert_eq_with_retry(
|
|
node2,
|
|
"SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
|
|
"0",
|
|
)
|
|
|
|
# Insert to the second to satisfy quorum
|
|
insert_value_to_node(
|
|
node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"}
|
|
)
|
|
|
|
res.get()
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
|
|
|
|
p.close()
|
|
p.join()
|
|
|
|
node2.query("SYSTEM SYNC REPLICA q", timeout=10)
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")
|