Merge pull request #21944 from azat/dist-stress-test

Add stress test for distributed queries
This commit is contained in:
tavplubix 2021-03-31 14:54:43 +03:00 committed by GitHub
commit 5fa2244aa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 145 additions and 0 deletions

View File

@ -0,0 +1,42 @@
<yandex>
<max_concurrent_queries>1000</max_concurrent_queries>
<remote_servers>
<one_shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_r2</host>
<port>9000</port>
</replica>
</shard>
</one_shard>
<two_shards>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_r2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node2_r2</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,103 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=line-too-long
import shlex
import itertools
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1_r1 = cluster.add_instance('node1_r1', main_configs=['configs/remote_servers.xml'])
node2_r1 = cluster.add_instance('node2_r1', main_configs=['configs/remote_servers.xml'])
node1_r2 = cluster.add_instance('node1_r2', main_configs=['configs/remote_servers.xml'])
node2_r2 = cluster.add_instance('node2_r2', main_configs=['configs/remote_servers.xml'])
def run_benchmark(payload, settings):
node1_r1.exec_in_container([
'bash', '-c', 'echo {} | '.format(shlex.quote(payload.strip())) + ' '.join([
'clickhouse', 'benchmark',
'--concurrency=100',
'--cumulative',
'--delay=0',
# NOTE: with current matrix even 3 seconds it huge...
'--timelimit=3',
# tune some basic timeouts
'--hedged_connection_timeout_ms=200',
'--connect_timeout_with_failover_ms=200',
'--connections_with_failover_max_tries=5',
*settings,
])
])
@pytest.fixture(scope='module')
def started_cluster():
try:
cluster.start()
for _, instance in cluster.instances.items():
instance.query("""
create table if not exists data (
key Int,
/* just to increase block size */
v1 UInt64,
v2 UInt64,
v3 UInt64,
v4 UInt64,
v5 UInt64,
v6 UInt64,
v7 UInt64,
v8 UInt64,
v9 UInt64,
v10 UInt64,
v11 UInt64,
v12 UInt64
) Engine=MergeTree() order by key partition by key%5;
insert into data (key) select * from numbers(10);
create table if not exists dist_one as data engine=Distributed(one_shard, currentDatabase(), data, key);
create table if not exists dist_one_over_dist as data engine=Distributed(one_shard, currentDatabase(), dist_one, yandexConsistentHash(key, 2));
create table if not exists dist_two as data engine=Distributed(two_shards, currentDatabase(), data, key);
create table if not exists dist_two_over_dist as data engine=Distributed(two_shards, currentDatabase(), dist_two, yandexConsistentHash(key, 2));
""")
yield cluster
finally:
cluster.shutdown()
# since it includes started_cluster fixture at first start
@pytest.mark.timeout(60)
@pytest.mark.parametrize('table,settings', itertools.product(
[ # tables
'dist_one',
'dist_one_over_dist',
'dist_two',
'dist_two_over_dist',
],
[ # settings
*list(itertools.combinations([
'', # defaults
'--prefer_localhost_replica=0',
'--async_socket_for_remote=0',
'--use_hedged_requests=0',
'--optimize_skip_unused_shards=1',
'--distributed_group_by_no_merge=2',
'--optimize_distributed_group_by_sharding_key=1',
# TODO: enlarge test matrix (but first those values to accept ms):
#
# - sleep_in_send_tables_status
# - sleep_in_send_data
], 2))
# TODO: more combinations that just 2
],
))
def test_stress_distributed(table, settings, started_cluster):
payload = f'''
select * from {table} where key = 0;
select * from {table} where key = 1;
select * from {table} where key = 2;
select * from {table} where key = 3;
select * from {table};
'''
run_benchmark(payload, settings)