ClickHouse/tests/integration/test_replicated_fetches_bandwidth/test.py

266 lines
9.8 KiB
Python
Raw Normal View History

2021-05-27 12:54:47 +00:00
#!/usr/bin/env python3
import random
2024-09-27 10:19:39 +00:00
import statistics
2021-05-27 12:54:47 +00:00
import string
import subprocess
import time
2024-09-27 10:19:39 +00:00
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import NetThroughput
2021-05-27 12:54:47 +00:00
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)
node3 = cluster.add_instance(
"node3", main_configs=["configs/limit_replication_config.xml"], with_zookeeper=True
)
2021-05-27 12:54:47 +00:00
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
2021-05-27 12:54:47 +00:00
def get_random_string(length):
return "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
)
2021-05-27 12:54:47 +00:00
def test_limited_fetch_single_table(start_cluster):
2021-06-02 08:22:44 +00:00
print("Limited fetches single table")
2021-05-27 12:54:47 +00:00
try:
for i, node in enumerate([node1, node2]):
node.query(
f"CREATE TABLE limited_fetch_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetch_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=10485760"
)
2021-05-27 12:54:47 +00:00
node2.query("SYSTEM STOP FETCHES limited_fetch_table")
for i in range(5):
node1.query(
"INSERT INTO limited_fetch_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(300)".format(
i
)
)
2021-05-27 12:54:47 +00:00
n1_net = NetThroughput(node1)
n2_net = NetThroughput(node2)
node2.query("SYSTEM START FETCHES limited_fetch_table")
n2_fetch_speed = []
for i in range(10):
n1_in, n1_out = n1_net.measure_speed("megabytes")
n2_in, n2_out = n2_net.measure_speed("megabytes")
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
print("[N2] input:", n2_in, "MB/s", "output:", n2_out, "MB/s")
2021-05-27 12:54:47 +00:00
n2_fetch_speed.append(n2_in)
time.sleep(0.5)
2021-06-01 22:07:29 +00:00
median_speed = statistics.median(n2_fetch_speed)
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
assert median_speed <= 15, (
"We exceeded max fetch speed for more than 10MB/s. Must be around 10 (+- 5), got "
+ str(median_speed)
)
2021-05-27 12:54:47 +00:00
finally:
2021-05-27 13:00:08 +00:00
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS limited_fetch_table SYNC")
2021-05-27 12:54:47 +00:00
2021-05-27 12:54:47 +00:00
def test_limited_send_single_table(start_cluster):
2021-06-02 08:22:44 +00:00
print("Limited sends single table")
2021-05-27 12:54:47 +00:00
try:
for i, node in enumerate([node1, node2]):
node.query(
f"CREATE TABLE limited_send_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetch_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_sends_network_bandwidth=5242880"
)
2021-05-27 12:54:47 +00:00
2021-06-01 22:07:29 +00:00
node2.query("SYSTEM STOP FETCHES limited_send_table")
2021-05-27 12:54:47 +00:00
for i in range(5):
node1.query(
"INSERT INTO limited_send_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
i
)
)
2021-05-27 12:54:47 +00:00
n1_net = NetThroughput(node1)
n2_net = NetThroughput(node2)
2021-06-01 22:07:29 +00:00
node2.query("SYSTEM START FETCHES limited_send_table")
2021-05-27 12:54:47 +00:00
n1_sends_speed = []
for i in range(10):
n1_in, n1_out = n1_net.measure_speed("megabytes")
n2_in, n2_out = n2_net.measure_speed("megabytes")
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
print("[N2] input:", n2_in, "MB/s", "output:", n2_out, "MB/s")
2021-05-27 12:54:47 +00:00
n1_sends_speed.append(n1_out)
time.sleep(0.5)
2021-06-01 22:07:29 +00:00
median_speed = statistics.median(n1_sends_speed)
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
assert median_speed <= 10, (
"We exceeded max send speed for more than 5MB/s. Must be around 5 (+- 5), got "
+ str(median_speed)
)
2021-05-27 12:54:47 +00:00
finally:
2021-05-27 13:00:08 +00:00
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS limited_send_table SYNC")
2021-05-27 12:54:47 +00:00
def test_limited_fetches_for_server(start_cluster):
2021-06-02 08:22:44 +00:00
print("Limited fetches for server")
2021-05-27 12:54:47 +00:00
try:
for i, node in enumerate([node1, node3]):
for j in range(5):
node.query(
f"CREATE TABLE limited_fetches{j}(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetches{j}', '{i}') ORDER BY tuple() PARTITION BY key"
)
2021-05-27 12:54:47 +00:00
for j in range(5):
node3.query(f"SYSTEM STOP FETCHES limited_fetches{j}")
for i in range(5):
node1.query(
2024-02-28 00:46:25 +00:00
"INSERT INTO limited_fetches{} SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
j, i
)
)
2021-05-27 12:54:47 +00:00
n1_net = NetThroughput(node1)
n3_net = NetThroughput(node3)
for j in range(5):
node3.query(f"SYSTEM START FETCHES limited_fetches{j}")
n3_fetches_speed = []
2021-06-01 22:07:29 +00:00
for i in range(5):
n1_in, n1_out = n1_net.measure_speed("megabytes")
n3_in, n3_out = n3_net.measure_speed("megabytes")
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
print("[N3] input:", n3_in, "MB/s", "output:", n3_out, "MB/s")
2021-05-27 12:54:47 +00:00
n3_fetches_speed.append(n3_in)
time.sleep(0.5)
2021-06-01 22:07:29 +00:00
median_speed = statistics.median(n3_fetches_speed)
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
assert median_speed <= 15, (
"We exceeded max fetch speed for more than 15MB/s. Must be around 5 (+- 10), got "
+ str(median_speed)
)
2021-05-27 12:54:47 +00:00
finally:
2021-05-27 13:00:08 +00:00
for node in [node1, node3]:
2021-05-27 12:54:47 +00:00
for j in range(5):
2021-05-27 13:00:08 +00:00
node.query(f"DROP TABLE IF EXISTS limited_fetches{j} SYNC")
2021-05-27 12:54:47 +00:00
def test_limited_sends_for_server(start_cluster):
2021-06-02 08:22:44 +00:00
print("Limited sends for server")
2021-05-27 12:54:47 +00:00
try:
for i, node in enumerate([node1, node3]):
for j in range(5):
node.query(
f"CREATE TABLE limited_sends{j}(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_sends{j}', '{i}') ORDER BY tuple() PARTITION BY key"
)
2021-05-27 12:54:47 +00:00
for j in range(5):
2021-06-01 22:07:29 +00:00
node1.query(f"SYSTEM STOP FETCHES limited_sends{j}")
2021-05-27 12:54:47 +00:00
for i in range(5):
node3.query(
2024-02-28 00:46:25 +00:00
"INSERT INTO limited_sends{} SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
j, i
)
)
2021-05-27 12:54:47 +00:00
n1_net = NetThroughput(node1)
n3_net = NetThroughput(node3)
for j in range(5):
2021-06-01 22:07:29 +00:00
node1.query(f"SYSTEM START FETCHES limited_sends{j}")
2021-05-27 12:54:47 +00:00
n3_sends_speed = []
2021-06-01 22:07:29 +00:00
for i in range(5):
n1_in, n1_out = n1_net.measure_speed("megabytes")
n3_in, n3_out = n3_net.measure_speed("megabytes")
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
print("[N3] input:", n3_in, "MB/s", "output:", n3_out, "MB/s")
2021-05-27 12:54:47 +00:00
n3_sends_speed.append(n3_out)
time.sleep(0.5)
2021-06-01 22:07:29 +00:00
median_speed = statistics.median(n3_sends_speed)
2021-05-27 15:02:06 +00:00
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
assert median_speed <= 20, (
"We exceeded max send speed for more than 20MB/s. Must be around 5 (+- 10), got "
+ str(median_speed)
)
2021-05-27 12:54:47 +00:00
finally:
2021-05-27 13:00:08 +00:00
for node in [node1, node3]:
2021-05-27 12:54:47 +00:00
for j in range(5):
2021-05-27 13:00:08 +00:00
node.query(f"DROP TABLE IF EXISTS limited_sends{j} SYNC")
2021-05-27 12:54:47 +00:00
def test_should_execute_fetch(start_cluster):
2021-06-02 08:22:44 +00:00
print("Should execute fetch")
2021-05-27 12:54:47 +00:00
try:
for i, node in enumerate([node1, node2]):
node.query(
f"CREATE TABLE should_execute_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/should_execute_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=3505253"
)
2021-05-27 12:54:47 +00:00
node2.query("SYSTEM STOP FETCHES should_execute_table")
for i in range(3):
node1.query(
"INSERT INTO should_execute_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(200)".format(
i
)
)
2021-05-27 12:54:47 +00:00
n1_net = NetThroughput(node1)
n2_net = NetThroughput(node2)
node2.query("SYSTEM START FETCHES should_execute_table")
for i in range(10):
node1.query(
"INSERT INTO should_execute_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(3)".format(
i
)
)
2021-05-27 12:54:47 +00:00
n2_fetch_speed = []
replication_queue_data = []
for i in range(10):
n1_in, n1_out = n1_net.measure_speed("megabytes")
n2_in, n2_out = n2_net.measure_speed("megabytes")
2021-05-27 12:54:47 +00:00
fetches_count = node2.query("SELECT count() FROM system.replicated_fetches")
if fetches_count == "0\n":
break
print("Fetches count", fetches_count)
replication_queue_data.append(
node2.query(
"SELECT count() FROM system.replication_queue WHERE postpone_reason like '%fetches have already throttled%'"
)
)
2021-05-27 12:54:47 +00:00
n2_fetch_speed.append(n2_in)
time.sleep(0.5)
node2.query("SYSTEM SYNC REPLICA should_execute_table")
assert any(int(f.strip()) != 0 for f in replication_queue_data)
assert node2.query("SELECT COUNT() FROM should_execute_table") == "630\n"
finally:
2021-05-27 13:00:08 +00:00
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS should_execute_table SYNC")