2020-10-08 13:14:44 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
from multiprocessing.dummy import Pool
|
|
|
|
from helpers.network import PartitionManager
|
|
|
|
from helpers.client import QueryRuntimeException
|
|
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
|
|
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
|
|
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
|
|
|
node2 = cluster.add_instance("node2", with_zookeeper=True)
|
|
|
|
node3 = cluster.add_instance("node3", with_zookeeper=True)
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def started_cluster():
|
|
|
|
global cluster
|
|
|
|
try:
|
|
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
|
|
|
|
finally:
|
|
|
|
cluster.shutdown()
|
|
|
|
|
|
|
|
|
|
|
|
def test_parallel_quorum_actually_parallel(started_cluster):
|
|
|
|
settings = {"insert_quorum": "3", "insert_quorum_parallel": "1"}
|
|
|
|
for i, node in enumerate([node1, node2, node3]):
|
|
|
|
node.query("CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(num=i))
|
|
|
|
|
|
|
|
p = Pool(10)
|
|
|
|
|
|
|
|
def long_insert(node):
|
|
|
|
node.query("INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0", settings=settings)
|
|
|
|
|
|
|
|
job = p.apply_async(long_insert, (node1,))
|
|
|
|
|
|
|
|
node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
|
|
|
|
assert node1.query("SELECT COUNT() FROM r") == "1\n"
|
|
|
|
assert node2.query("SELECT COUNT() FROM r") == "1\n"
|
|
|
|
assert node3.query("SELECT COUNT() FROM r") == "1\n"
|
|
|
|
|
|
|
|
node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
|
|
|
|
assert node1.query("SELECT COUNT() FROM r") == "2\n"
|
|
|
|
assert node2.query("SELECT COUNT() FROM r") == "2\n"
|
|
|
|
assert node3.query("SELECT COUNT() FROM r") == "2\n"
|
|
|
|
|
|
|
|
job.get()
|
|
|
|
|
|
|
|
assert node1.query("SELECT COUNT() FROM r") == "7\n"
|
|
|
|
assert node2.query("SELECT COUNT() FROM r") == "7\n"
|
|
|
|
assert node3.query("SELECT COUNT() FROM r") == "7\n"
|
|
|
|
p.close()
|
|
|
|
p.join()
|
|
|
|
|
|
|
|
|
|
|
|
def test_parallel_quorum_actually_quorum(started_cluster):
|
|
|
|
for i, node in enumerate([node1, node2, node3]):
|
|
|
|
node.query("CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(num=i))
|
|
|
|
|
|
|
|
with PartitionManager() as pm:
|
|
|
|
pm.partition_instances(node2, node1, port=9009)
|
|
|
|
pm.partition_instances(node2, node3, port=9009)
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
2020-10-09 07:19:17 +00:00
|
|
|
node1.query("INSERT INTO q VALUES(1, 'Hello')", settings={"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
|
2020-10-08 13:14:44 +00:00
|
|
|
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
|
|
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
|
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")
|
|
|
|
|
|
|
|
node1.query("INSERT INTO q VALUES(2, 'wlrd')", settings={"insert_quorum": "2", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
|
|
|
|
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
|
|
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
|
|
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")
|
|
|
|
|
|
|
|
def insert_value_to_node(node, settings):
|
|
|
|
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
|
|
|
|
|
|
|
|
p = Pool(2)
|
2020-10-09 07:19:17 +00:00
|
|
|
res = p.apply_async(insert_value_to_node, (node1, {"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "60000"}))
|
|
|
|
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
|
|
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
|
|
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "0")
|
|
|
|
|
|
|
|
# Insert to the second to satisfy quorum
|
|
|
|
insert_value_to_node(node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"})
|
|
|
|
|
|
|
|
res.get()
|
2020-10-08 13:14:44 +00:00
|
|
|
|
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
|
|
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
|
|
|
|
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
|
|
|
|
|
|
|
|
p.close()
|
|
|
|
p.join()
|
|
|
|
|
|
|
|
node2.query("SYSTEM SYNC REPLICA q", timeout=10)
|
|
|
|
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")
|