ClickHouse/tests/integration/test_parallel_replicas_over_distributed/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

127 lines
4.5 KiB
Python
Raw Normal View History

2023-08-07 19:12:53 +00:00
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
2023-08-07 21:28:44 +00:00
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
2023-08-07 19:12:53 +00:00
for i in (1, 2, 3, 4)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
2023-08-07 21:06:12 +00:00
def create_tables(cluster, table_name):
# create replicated tables
2023-08-07 19:12:53 +00:00
for node in nodes:
2023-08-07 21:06:12 +00:00
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
if cluster == "test_single_shard_multiple_replicas":
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r4') ORDER BY (key)"
)
elif cluster == "test_multiple_shards_multiple_replicas":
# shard 1
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
# shard 2
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r1') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r2') ORDER BY (key)"
2023-08-07 19:12:53 +00:00
)
2023-08-07 21:06:12 +00:00
else:
raise Exception(f"Unexpected cluster: {cluster}")
2023-08-07 19:12:53 +00:00
2023-08-07 21:06:12 +00:00
# create distributed table
nodes[0].query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
2023-08-07 19:12:53 +00:00
nodes[0].query(
f"""
2023-08-07 21:06:12 +00:00
CREATE TABLE {table_name}_d AS {table_name}
2023-08-07 19:12:53 +00:00
Engine=Distributed(
{cluster},
currentDatabase(),
2023-08-07 21:06:12 +00:00
{table_name},
key
2023-08-07 19:12:53 +00:00
)
"""
)
2023-08-07 21:06:12 +00:00
# populate data
2023-08-07 21:28:44 +00:00
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1)",
settings={"insert_distributed_sync": 1},
)
2023-08-07 19:12:53 +00:00
@pytest.mark.parametrize(
"cluster",
2023-08-07 21:06:12 +00:00
["test_single_shard_multiple_replicas", "test_multiple_shards_multiple_replicas"],
2023-08-07 19:12:53 +00:00
)
2023-08-07 21:13:08 +00:00
def test_parallel_replicas_over_distributed(start_cluster, cluster):
2023-08-07 21:06:12 +00:00
table_name = "test_table"
create_tables(cluster, table_name)
2023-08-07 19:12:53 +00:00
node = nodes[0]
2023-08-07 21:06:12 +00:00
expected_result = f"6001\t-1999\t1999\t0\n"
# w/o parallel replicas
2023-08-14 23:06:48 +00:00
assert (
node.query(f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d")
== expected_result
)
2023-08-07 21:06:12 +00:00
# parallel replicas
2023-08-07 19:12:53 +00:00
assert (
node.query(
2023-08-07 21:06:12 +00:00
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
2023-08-07 19:12:53 +00:00
settings={
2023-08-07 21:06:12 +00:00
"allow_experimental_parallel_reading_from_replicas": 2,
2023-08-07 19:12:53 +00:00
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
2023-08-07 21:06:12 +00:00
"use_hedged_requests": 0,
# "cluster_for_parallel_replicas": cluster,
2023-08-07 19:12:53 +00:00
},
)
== expected_result
)