Update test: use ReplicatedMergeTree

This commit is contained in:
Igor Nikonov 2023-08-07 21:06:12 +00:00
parent 9b631e2cef
commit 50b0db598e

View File

@ -4,7 +4,7 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(f"n{i}", main_configs=["configs/remote_servers.xml"])
cluster.add_instance(f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True)
for i in (1, 2, 3, 4)
]
@ -18,50 +18,92 @@ def start_cluster():
cluster.shutdown()
def create_tables(cluster):
def create_tables(cluster, table_name):
# create replicated tables
for node in nodes:
node.query("DROP TABLE IF EXISTS test_table")
node.query("DROP TABLE IF EXISTS dist_table")
node.query(
"CREATE TABLE IF NOT EXISTS test_table (key Int64, value String) Engine=MergeTree ORDER BY (key)"
)
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
if cluster == "test_single_shard_multiple_replicas":
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r4') ORDER BY (key)"
)
elif cluster == "test_multiple_shards_multiple_replicas":
# shard 1
nodes[0].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1') ORDER BY (key)"
)
nodes[1].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2') ORDER BY (key)"
)
# shard 2
nodes[2].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r1') ORDER BY (key)"
)
nodes[3].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard2/{table_name}', 'r2') ORDER BY (key)"
)
else:
raise Exception(f"Unexpected cluster: {cluster}")
# create distributed table
nodes[0].query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
nodes[0].query(
f"""
CREATE TABLE dist_table AS test_table
CREATE TABLE {table_name}_d AS {table_name}
Engine=Distributed(
{cluster},
currentDatabase(),
test_table,
rand()
{table_name},
key
)
"""
)
nodes[0].query(f"INSERT INTO test_table SELECT number, number FROM numbers(1000)")
nodes[1].query(f"INSERT INTO test_table SELECT number, number FROM numbers(2000)")
nodes[2].query(f"INSERT INTO test_table SELECT -number, -number FROM numbers(1000)")
nodes[3].query(f"INSERT INTO test_table SELECT -number, -number FROM numbers(2000)")
nodes[0].query(f"INSERT INTO test_table SELECT number, number FROM numbers(1)")
# populate data
nodes[0].query(f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1000)", settings={"insert_distributed_sync": 1})
nodes[0].query(f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(2000)", settings={"insert_distributed_sync": 1})
nodes[0].query(f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(1000)", settings={"insert_distributed_sync": 1})
nodes[0].query(f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(2000)", settings={"insert_distributed_sync": 1})
nodes[0].query(f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1)", settings={"insert_distributed_sync": 1})
@pytest.mark.parametrize(
"cluster",
["test_multiple_shards_multiple_replicas", "test_single_shard_multiple_replicas"],
["test_single_shard_multiple_replicas", "test_multiple_shards_multiple_replicas"],
)
def test_parallel_replicas_custom_key(start_cluster, cluster):
create_tables(cluster)
table_name = "test_table"
create_tables(cluster, table_name)
expected_result = f"6001\t-1999\t1999\t0\n"
node = nodes[0]
expected_result = f"6001\t-1999\t1999\t0\n"
# w/o parallel replicas
assert (
node.query(
"SELECT count(), min(key), max(key), sum(key) FROM dist_table",
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d"
)
== expected_result
)
# parallel replicas
assert (
node.query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 1,
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
"use_hedged_requests": 1,
"use_hedged_requests": 0,
"cluster_for_parallel_replicas": cluster,
},
)