ClickHouse/tests/integration/test_hedged_requests/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

443 lines
14 KiB
Python
Raw Normal View History

2021-01-19 19:21:06 +00:00
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
2021-02-26 15:53:40 +00:00
from helpers.test_tools import TSV
2021-01-19 19:21:06 +00:00
cluster = ClickHouseCluster(__file__)
2021-02-15 13:21:36 +00:00
NODES = {"node_" + str(i): None for i in (1, 2, 3)}
2021-02-17 17:34:52 +00:00
2021-02-15 13:21:36 +00:00
NODES["node"] = None
2021-01-19 19:21:06 +00:00
2021-03-22 19:18:06 +00:00
# Sleep time in milliseconds.
sleep_time = 30000
2021-01-19 19:21:06 +00:00
2021-01-19 19:21:06 +00:00
@pytest.fixture(scope="module")
def started_cluster():
2021-02-15 13:21:36 +00:00
NODES["node"] = cluster.add_instance(
2021-03-22 19:18:06 +00:00
"node",
stay_alive=True,
main_configs=["configs/remote_servers.xml", "configs/logger.xml"],
2021-03-22 19:18:06 +00:00
user_configs=["configs/users.xml"],
)
2021-02-15 13:21:36 +00:00
for name in NODES:
if name != "node":
2021-03-22 19:18:06 +00:00
NODES[name] = cluster.add_instance(
name, user_configs=["configs/users1.xml"]
)
2021-02-17 17:34:52 +00:00
2021-01-19 19:21:06 +00:00
try:
cluster.start()
2021-02-15 13:21:36 +00:00
for node_id, node in list(NODES.items()):
2021-03-22 19:18:06 +00:00
node.query(
"""CREATE TABLE test_hedged (id UInt32, date Date) ENGINE =
MergeTree() ORDER BY id PARTITION BY toYYYYMM(date)"""
)
2021-01-19 19:21:06 +00:00
2021-03-22 19:18:06 +00:00
node.query(
"INSERT INTO test_hedged select number, toDate(number) from numbers(100);"
)
2021-01-19 19:21:06 +00:00
2021-03-22 19:18:06 +00:00
NODES["node"].query(
"""CREATE TABLE distributed (id UInt32, date Date) ENGINE =
Distributed('test_cluster', 'default', 'test_hedged')"""
)
2021-02-15 13:21:36 +00:00
2021-01-19 19:21:06 +00:00
yield cluster
finally:
cluster.shutdown()
config = """<clickhouse>
2021-02-15 13:21:36 +00:00
<profiles>
<default>
2021-03-22 19:18:06 +00:00
<sleep_in_send_tables_status_ms>{sleep_in_send_tables_status_ms}</sleep_in_send_tables_status_ms>
<sleep_in_send_data_ms>{sleep_in_send_data_ms}</sleep_in_send_data_ms>
<sleep_after_receiving_query_ms>{sleep_after_receiving_query_ms}</sleep_after_receiving_query_ms>
2021-02-15 13:21:36 +00:00
</default>
</profiles>
</clickhouse>"""
2021-02-15 13:21:36 +00:00
2021-02-26 15:53:40 +00:00
def check_query(expected_replica, receive_timeout=300):
2021-02-17 17:34:52 +00:00
NODES["node"].restart_clickhouse()
2021-02-02 15:18:05 +00:00
# Without hedged requests select query will last more than 30 seconds,
# with hedged requests it will last just around 1-2 second
2021-01-19 19:21:06 +00:00
start = time.time()
2021-03-02 22:24:41 +00:00
result = NODES["node"].query(
"SELECT hostName(), id FROM distributed ORDER BY id LIMIT 1 SETTINGS receive_timeout={}".format(
receive_timeout
)
)
2021-01-19 19:21:06 +00:00
query_time = time.time() - start
2021-02-26 15:53:40 +00:00
2021-03-02 22:24:41 +00:00
assert TSV(result) == TSV(expected_replica + "\t0")
2021-02-26 15:53:40 +00:00
2021-02-17 17:34:52 +00:00
print("Query time:", query_time)
assert query_time < 10
2021-02-15 13:21:36 +00:00
2022-05-02 11:18:51 +00:00
def check_settings(
node_name,
sleep_in_send_tables_status_ms,
sleep_in_send_data_ms,
2022-05-02 12:55:17 +00:00
sleep_after_receiving_query_ms,
2022-05-02 11:18:51 +00:00
):
2021-03-02 14:05:33 +00:00
attempts = 0
while attempts < 1000:
2021-03-22 19:18:06 +00:00
setting1 = NODES[node_name].http_query(
"SELECT value FROM system.settings WHERE name='sleep_in_send_tables_status_ms'"
)
setting2 = NODES[node_name].http_query(
"SELECT value FROM system.settings WHERE name='sleep_in_send_data_ms'"
)
setting3 = NODES[node_name].http_query(
"SELECT value FROM system.settings WHERE name='sleep_after_receiving_query_ms'"
)
2021-03-22 19:18:06 +00:00
if (
int(setting1) == sleep_in_send_tables_status_ms
and int(setting2) == sleep_in_send_data_ms
and int(setting3) == sleep_after_receiving_query_ms
2021-03-22 19:18:06 +00:00
):
2021-03-02 14:05:33 +00:00
return
time.sleep(0.1)
attempts += 1
assert attempts < 1000
def check_changing_replica_events(expected_count):
result = NODES["node"].query(
"SELECT value FROM system.events WHERE event='HedgedRequestsChangeReplica'"
)
# If server load is high we can see more than expected
# replica change events, but never less than expected
assert int(result) >= expected_count
def check_if_query_sending_was_suspended():
2023-04-20 17:35:34 +00:00
result = NODES["node"].query(
"SELECT value FROM system.events WHERE event='SuspendSendingQueryToShard'"
)
return len(result) != 0 and int(result) >= 1
2023-04-20 17:35:34 +00:00
def check_if_query_sending_was_not_suspended():
result = NODES["node"].query(
"SELECT value FROM system.events WHERE event='SuspendSendingQueryToShard'"
)
2023-04-20 17:58:01 +00:00
assert result == ""
2023-04-20 17:35:34 +00:00
2021-03-22 19:18:06 +00:00
def update_configs(
node_1_sleep_in_send_tables_status=0,
node_1_sleep_in_send_data=0,
node_1_sleep_after_receiving_query=0,
2021-03-22 19:18:06 +00:00
node_2_sleep_in_send_tables_status=0,
node_2_sleep_in_send_data=0,
node_2_sleep_after_receiving_query=0,
2021-03-22 19:18:06 +00:00
node_3_sleep_in_send_tables_status=0,
node_3_sleep_in_send_data=0,
node_3_sleep_after_receiving_query=0,
2021-03-22 19:18:06 +00:00
):
NODES["node_1"].replace_config(
"/etc/clickhouse-server/users.d/users1.xml",
config.format(
sleep_in_send_tables_status_ms=node_1_sleep_in_send_tables_status,
sleep_in_send_data_ms=node_1_sleep_in_send_data,
sleep_after_receiving_query_ms=node_1_sleep_after_receiving_query,
),
2021-03-22 19:18:06 +00:00
)
2021-03-22 19:18:06 +00:00
NODES["node_2"].replace_config(
"/etc/clickhouse-server/users.d/users1.xml",
config.format(
sleep_in_send_tables_status_ms=node_2_sleep_in_send_tables_status,
sleep_in_send_data_ms=node_2_sleep_in_send_data,
sleep_after_receiving_query_ms=node_2_sleep_after_receiving_query,
),
2021-03-22 19:18:06 +00:00
)
2021-03-22 19:18:06 +00:00
NODES["node_3"].replace_config(
"/etc/clickhouse-server/users.d/users1.xml",
config.format(
sleep_in_send_tables_status_ms=node_3_sleep_in_send_tables_status,
sleep_in_send_data_ms=node_3_sleep_in_send_data,
sleep_after_receiving_query_ms=node_3_sleep_after_receiving_query,
),
2021-03-22 19:18:06 +00:00
)
2021-03-22 19:18:06 +00:00
check_settings(
2022-05-02 11:18:51 +00:00
"node_1",
node_1_sleep_in_send_tables_status,
node_1_sleep_in_send_data,
2022-05-02 12:55:17 +00:00
node_1_sleep_after_receiving_query,
2021-03-22 19:18:06 +00:00
)
check_settings(
2022-05-02 11:18:51 +00:00
"node_2",
node_2_sleep_in_send_tables_status,
node_2_sleep_in_send_data,
2022-05-02 12:55:17 +00:00
node_2_sleep_after_receiving_query,
2021-03-22 19:18:06 +00:00
)
check_settings(
2022-05-02 11:18:51 +00:00
"node_3",
node_3_sleep_in_send_tables_status,
node_3_sleep_in_send_data,
2022-05-02 12:55:17 +00:00
node_3_sleep_after_receiving_query,
2021-03-22 19:18:06 +00:00
)
2021-02-26 15:53:40 +00:00
def test_stuck_replica(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs()
2021-02-26 15:53:40 +00:00
cluster.pause_container("node_1")
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2")
check_changing_replica_events(1)
result = NODES["node"].query(
"SELECT slowdowns_count FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'"
)
assert TSV(result) == TSV("1")
result = NODES["node"].query(
"SELECT hostName(), id FROM distributed ORDER BY id LIMIT 1"
)
assert TSV(result) == TSV("node_2\t0")
# Check that we didn't choose node_1 first again and slowdowns_count didn't increase.
result = NODES["node"].query(
"SELECT slowdowns_count FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'"
)
assert TSV(result) == TSV("1")
2021-02-26 15:53:40 +00:00
cluster.unpause_container("node_1")
2021-03-02 22:24:41 +00:00
def test_long_query(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs()
# Restart to reset pool states.
NODES["node"].restart_clickhouse()
2021-03-02 22:24:41 +00:00
result = NODES["node"].query(
"select hostName(), max(id + sleep(1.5)) from distributed settings max_block_size = 1, max_threads = 1, max_distributed_connections = 1;"
2021-03-02 22:24:41 +00:00
)
assert TSV(result) == TSV("node_1\t99")
NODES["node"].query(
"INSERT INTO distributed select number, toDate(number) from numbers(100);"
)
2021-02-15 13:21:36 +00:00
def test_send_table_status_sleep(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(node_1_sleep_in_send_tables_status=sleep_time)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2")
check_changing_replica_events(1)
2021-02-15 13:21:36 +00:00
2021-03-02 14:05:33 +00:00
2021-02-15 13:21:36 +00:00
def test_send_table_status_sleep2(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_tables_status=sleep_time,
node_2_sleep_in_send_tables_status=sleep_time,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_3")
check_changing_replica_events(2)
2021-02-15 13:21:36 +00:00
2021-02-17 17:34:52 +00:00
2021-02-15 13:21:36 +00:00
def test_send_data(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(node_1_sleep_in_send_data=sleep_time)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2")
check_changing_replica_events(1)
2021-02-15 13:21:36 +00:00
def test_send_data2(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_3")
check_changing_replica_events(2)
2021-02-15 13:21:36 +00:00
def test_combination1(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_tables_status=sleep_time,
node_2_sleep_in_send_data=sleep_time,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_3")
check_changing_replica_events(2)
2021-02-15 13:21:36 +00:00
def test_combination2(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=sleep_time,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_3")
check_changing_replica_events(2)
2021-02-15 13:21:36 +00:00
def test_combination3(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=1000,
node_3_sleep_in_send_data=sleep_time,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2")
check_changing_replica_events(3)
2021-02-15 13:21:36 +00:00
def test_combination4(started_cluster):
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_tables_status=1000,
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=1000,
node_3_sleep_in_send_tables_status=1000,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2")
check_changing_replica_events(4)
2021-02-26 15:53:40 +00:00
def test_receive_timeout1(started_cluster):
# Check the situation when first two replicas get receive timeout
# in establishing connection, but the third replica is ok.
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_tables_status=3000,
node_2_sleep_in_send_tables_status=3000,
node_3_sleep_in_send_data=1000,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_3", receive_timeout=2)
check_changing_replica_events(2)
2021-02-26 15:53:40 +00:00
def test_receive_timeout2(started_cluster):
# Check the situation when first replica get receive timeout
# in packet receiving but there are replicas in process of
# connection establishing.
2021-03-22 19:18:06 +00:00
update_configs(
node_1_sleep_in_send_data=5000,
2021-03-22 19:18:06 +00:00
node_2_sleep_in_send_tables_status=2000,
node_3_sleep_in_send_tables_status=2000,
)
2021-02-26 15:53:40 +00:00
check_query(expected_replica="node_2", receive_timeout=3)
check_changing_replica_events(3)
def test_initial_receive_timeout(started_cluster):
# Check the situation when replicas don't respond after
# receiving query (so, no packets were send to initiator)
update_configs(
node_1_sleep_after_receiving_query=20000,
node_2_sleep_after_receiving_query=20000,
node_3_sleep_after_receiving_query=20000,
)
NODES["node"].restart_clickhouse()
result = NODES["node"].query_and_get_error(
"SELECT hostName(), id FROM distributed ORDER BY id LIMIT 1 SETTINGS receive_timeout=3"
)
2022-05-02 11:18:51 +00:00
assert "SOCKET_TIMEOUT" in result
2023-04-20 17:35:34 +00:00
def test_async_connect(started_cluster):
update_configs()
NODES["node"].restart_clickhouse()
NODES["node"].query("DROP TABLE IF EXISTS distributed_connect")
NODES["node"].query(
"""CREATE TABLE distributed_connect (id UInt32, date Date) ENGINE =
Distributed('test_cluster_connect', 'default', 'test_hedged')"""
)
NODES["node"].query(
"SELECT hostName(), id FROM distributed_connect ORDER BY id LIMIT 1 SETTINGS prefer_localhost_replica = 0, connect_timeout_with_failover_ms=5000, async_query_sending_for_remote=0, max_threads=1, max_distributed_connections=1"
2023-04-20 17:35:34 +00:00
)
check_changing_replica_events(2)
check_if_query_sending_was_not_suspended()
# Restart server to reset connection pool state
NODES["node"].restart_clickhouse()
attempt = 0
2023-09-19 11:32:49 +00:00
while attempt < 100:
NODES["node"].query(
"SELECT hostName(), id FROM distributed_connect ORDER BY id LIMIT 1 SETTINGS prefer_localhost_replica = 0, connect_timeout_with_failover_ms=5000, async_query_sending_for_remote=1, max_threads=1, max_distributed_connections=1"
)
check_changing_replica_events(2)
if check_if_query_sending_was_suspended():
break
attempt += 1
2023-09-19 11:32:49 +00:00
assert attempt < 100
2023-04-20 17:35:34 +00:00
NODES["node"].query("DROP TABLE distributed_connect")
def test_async_query_sending(started_cluster):
update_configs(
node_1_sleep_after_receiving_query=5000,
node_2_sleep_after_receiving_query=5000,
node_3_sleep_after_receiving_query=5000,
)
NODES["node"].restart_clickhouse()
NODES["node"].query("DROP TABLE IF EXISTS distributed_query_sending")
NODES["node"].query(
"""CREATE TABLE distributed_query_sending (id UInt32, date Date) ENGINE =
Distributed('test_cluster_three_shards', 'default', 'test_hedged')"""
)
# Create big enough temporary table
NODES["node"].query("DROP TABLE IF EXISTS tmp")
NODES["node"].query(
"CREATE TEMPORARY TABLE tmp (number UInt64, s String) "
"as select number, randomString(number % 1000) from numbers(10000000)"
2023-04-20 17:35:34 +00:00
)
NODES["node"].query(
"SELECT hostName(), id FROM distributed_query_sending ORDER BY id LIMIT 1 SETTINGS"
" prefer_localhost_replica = 0, async_query_sending_for_remote=0, max_threads = 1, max_distributed_connections=1"
2023-04-20 17:35:34 +00:00
)
check_if_query_sending_was_not_suspended()
attempt = 0
2023-09-19 11:32:49 +00:00
while attempt < 100:
NODES["node"].query(
"SELECT hostName(), id FROM distributed_query_sending ORDER BY id LIMIT 1 SETTINGS"
" prefer_localhost_replica = 0, async_query_sending_for_remote=1, max_threads = 1, max_distributed_connections=1"
)
if check_if_query_sending_was_suspended():
break
attempt += 1
2023-04-20 17:35:34 +00:00
2023-09-19 11:32:49 +00:00
assert attempt < 100
2023-04-20 17:35:34 +00:00
NODES["node"].query("DROP TABLE distributed_query_sending")