ClickHouse/tests/integration/test_quorum_inserts_parallel/test.py
2020-10-09 10:19:17 +03:00

103 lines
4.1 KiB
Python

#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)
node3 = cluster.add_instance("node3", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_parallel_quorum_actually_parallel(started_cluster):
settings = {"insert_quorum": "3", "insert_quorum_parallel": "1"}
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(num=i))
p = Pool(10)
def long_insert(node):
node.query("INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0", settings=settings)
job = p.apply_async(long_insert, (node1,))
node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
assert node1.query("SELECT COUNT() FROM r") == "1\n"
assert node2.query("SELECT COUNT() FROM r") == "1\n"
assert node3.query("SELECT COUNT() FROM r") == "1\n"
node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
assert node1.query("SELECT COUNT() FROM r") == "2\n"
assert node2.query("SELECT COUNT() FROM r") == "2\n"
assert node3.query("SELECT COUNT() FROM r") == "2\n"
job.get()
assert node1.query("SELECT COUNT() FROM r") == "7\n"
assert node2.query("SELECT COUNT() FROM r") == "7\n"
assert node3.query("SELECT COUNT() FROM r") == "7\n"
p.close()
p.join()
def test_parallel_quorum_actually_quorum(started_cluster):
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(num=i))
with PartitionManager() as pm:
pm.partition_instances(node2, node1, port=9009)
pm.partition_instances(node2, node3, port=9009)
with pytest.raises(QueryRuntimeException):
node1.query("INSERT INTO q VALUES(1, 'Hello')", settings={"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")
node1.query("INSERT INTO q VALUES(2, 'wlrd')", settings={"insert_quorum": "2", "insert_quorum_parallel": "1", "insert_quorum_timeout": "3000"})
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")
def insert_value_to_node(node, settings):
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
p = Pool(2)
res = p.apply_async(insert_value_to_node, (node1, {"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "60000"}))
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
assert_eq_with_retry(node3, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "3")
assert_eq_with_retry(node2, "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1", "0")
# Insert to the second to satisfy quorum
insert_value_to_node(node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"})
res.get()
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
p.close()
p.join()
node2.query("SYSTEM SYNC REPLICA q", timeout=10)
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")