Add tests

This commit is contained in:
avogar 2023-04-20 17:35:34 +00:00
parent 39ba4c27bc
commit fd8e510819
2 changed files with 126 additions and 0 deletions

View File

@ -17,6 +17,51 @@
</replica>
</shard>
</test_cluster>
<test_cluster_three_shards>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node_3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_three_shards>
<test_cluster_connect>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>129.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>129.0.0.2</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_connect>
</remote_servers>
</clickhouse>

View File

@ -128,6 +128,22 @@ def check_changing_replica_events(expected_count):
assert int(result) >= expected_count
def check_if_query_sending_was_suspended(minimum_count):
result = NODES["node"].query(
"SELECT value FROM system.events WHERE event='SuspendSendingQueryToShard'"
)
assert int(result) >= minimum_count
def check_if_query_sending_was_not_suspended():
result = NODES["node"].query(
"SELECT value FROM system.events WHERE event='SuspendSendingQueryToShard'"
)
assert result == ''
def update_configs(
node_1_sleep_in_send_tables_status=0,
node_1_sleep_in_send_data=0,
@ -341,3 +357,68 @@ def test_initial_receive_timeout(started_cluster):
)
assert "SOCKET_TIMEOUT" in result
def test_async_connect(started_cluster):
update_configs()
NODES["node"].restart_clickhouse()
NODES["node"].query("DROP TABLE IF EXISTS distributed_connect")
NODES["node"].query(
"""CREATE TABLE distributed_connect (id UInt32, date Date) ENGINE =
Distributed('test_cluster_connect', 'default', 'test_hedged')"""
)
NODES["node"].query(
"SELECT hostName(), id FROM distributed_connect ORDER BY id LIMIT 1 SETTINGS prefer_localhost_replica = 0, connect_timeout_with_failover_ms=5000, async_query_sending_for_remote=0, max_threads=1"
)
check_changing_replica_events(2)
check_if_query_sending_was_not_suspended()
NODES["node"].query(
"SELECT hostName(), id FROM distributed_connect ORDER BY id LIMIT 1 SETTINGS prefer_localhost_replica = 0, connect_timeout_with_failover_ms=5000, async_query_sending_for_remote=1, max_threads=1"
)
check_changing_replica_events(2)
check_if_query_sending_was_suspended(2)
NODES["node"].query("DROP TABLE distributed_connect")
def test_async_query_sending(started_cluster):
update_configs(
node_1_sleep_after_receiving_query=5000,
node_2_sleep_after_receiving_query=5000,
node_3_sleep_after_receiving_query=5000,
)
NODES["node"].restart_clickhouse()
NODES["node"].query("DROP TABLE IF EXISTS distributed_query_sending")
NODES["node"].query(
"""CREATE TABLE distributed_query_sending (id UInt32, date Date) ENGINE =
Distributed('test_cluster_three_shards', 'default', 'test_hedged')"""
)
# Create big enough temporary table
NODES["node"].query("DROP TABLE IF EXISTS tmp")
NODES["node"].query(
"CREATE TEMPORARY TABLE tmp (number UInt64, s String) "
"as select number, randomString(number % 1000) from numbers(1000000)"
)
NODES["node"].query(
"SELECT hostName(), id FROM distributed_query_sending ORDER BY id LIMIT 1 SETTINGS"
" prefer_localhost_replica = 0, async_query_sending_for_remote=0, max_threads = 1"
)
check_if_query_sending_was_not_suspended()
NODES["node"].query(
"SELECT hostName(), id FROM distributed_query_sending ORDER BY id LIMIT 1 SETTINGS"
" prefer_localhost_replica = 0, async_query_sending_for_remote=1, max_threads = 1"
)
check_if_query_sending_was_suspended(3)
NODES["node"].query("DROP TABLE distributed_query_sending")