Add test for load balancing

This commit is contained in:
Igor Nikonov 2024-01-17 18:57:08 +00:00
parent 9e17877da5
commit 3b8ce04392
4 changed files with 143 additions and 1 deletions

View File

@ -108,7 +108,7 @@ def test_parallel_replicas_custom_key_failover(
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h SETTINGS skip_unavailable_shards=1"
f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t3\nn3\t2\n"
)

View File

@ -0,0 +1,26 @@
<clickhouse>
<remote_servers>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,116 @@
import pytest
import uuid
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"n1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"n2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"n3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"n4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
nodes = [node1, node2, node3, node4]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(table_name):
for i in range(0, 4):
nodes[i].query(f"DROP TABLE IF EXISTS {table_name} SYNC")
nodes[i].query(
f"CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r{i+1}') ORDER BY (key)"
)
# populate data
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(1000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(2000, 1000)"
)
node1.query(
f"INSERT INTO {table_name} SELECT number % 4, number FROM numbers(3000, 1000)"
)
node2.query(f"SYSTEM SYNC REPLICA {table_name}")
node3.query(f"SYSTEM SYNC REPLICA {table_name}")
node4.query(f"SYSTEM SYNC REPLICA {table_name}")
@pytest.mark.parametrize("use_hedged_requests", [1, 0])
@pytest.mark.parametrize("custom_key", ["sipHash64(key)", "key"])
@pytest.mark.parametrize("filter_type", ["default", "range"])
def test_parallel_replicas_custom_key_load_balancing(
start_cluster,
use_hedged_requests,
custom_key,
filter_type,
):
cluster = "test_single_shard_multiple_replicas"
table = "test_table"
create_tables(table)
expected_result = ""
for i in range(4):
expected_result += f"{i}\t1000\n"
log_comment = uuid.uuid4()
assert (
node1.query(
f"SELECT key, count() FROM cluster('{cluster}', default.test_table) GROUP BY key ORDER BY key",
settings={
"log_comment": log_comment,
"prefer_localhost_replica": 0,
"max_parallel_replicas": 4,
"parallel_replicas_custom_key": custom_key,
"parallel_replicas_custom_key_filter_type": filter_type,
"use_hedged_requests": use_hedged_requests,
# "async_socket_for_remote": 0,
# "async_query_sending_for_remote": 0,
},
)
== expected_result
)
for node in nodes:
node.query("system flush logs")
# the subqueries should be spread over available nodes
query_id = node1.query(
f"SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '{log_comment}' AND type = 'QueryFinish' AND initial_query_id = query_id"
)
assert query_id != ""
query_id = query_id[:-1]
assert (
node1.query(
f"SELECT 'subqueries', count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' AND query_id != initial_query_id SETTINGS skip_unavailable_shards=1"
)
== "subqueries\t4\n"
)
# check queries per node
assert (
node1.query(
f"SELECT h, count() FROM clusterAllReplicas({cluster}, system.query_log) WHERE initial_query_id = '{query_id}' AND type ='QueryFinish' GROUP BY hostname() as h ORDER BY h SETTINGS skip_unavailable_shards=1"
)
== "n1\t2\nn2\t1\nn3\t1\nn4\t1\n"
)