mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 02:41:59 +00:00
266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
import random
|
|
import statistics
|
|
import string
|
|
import subprocess
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.network import NetThroughput
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance("node1", with_zookeeper=True)
|
|
node2 = cluster.add_instance("node2", with_zookeeper=True)
|
|
node3 = cluster.add_instance(
|
|
"node3", main_configs=["configs/limit_replication_config.xml"], with_zookeeper=True
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_random_string(length):
|
|
return "".join(
|
|
random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
|
|
)
|
|
|
|
|
|
def test_limited_fetch_single_table(start_cluster):
|
|
print("Limited fetches single table")
|
|
try:
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
f"CREATE TABLE limited_fetch_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetch_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=10485760"
|
|
)
|
|
|
|
node2.query("SYSTEM STOP FETCHES limited_fetch_table")
|
|
|
|
for i in range(5):
|
|
node1.query(
|
|
"INSERT INTO limited_fetch_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(300)".format(
|
|
i
|
|
)
|
|
)
|
|
|
|
n1_net = NetThroughput(node1)
|
|
n2_net = NetThroughput(node2)
|
|
|
|
node2.query("SYSTEM START FETCHES limited_fetch_table")
|
|
n2_fetch_speed = []
|
|
for i in range(10):
|
|
n1_in, n1_out = n1_net.measure_speed("megabytes")
|
|
n2_in, n2_out = n2_net.measure_speed("megabytes")
|
|
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
|
|
print("[N2] input:", n2_in, "MB/s", "output:", n2_out, "MB/s")
|
|
n2_fetch_speed.append(n2_in)
|
|
time.sleep(0.5)
|
|
|
|
median_speed = statistics.median(n2_fetch_speed)
|
|
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
|
|
assert median_speed <= 15, (
|
|
"We exceeded max fetch speed for more than 10MB/s. Must be around 10 (+- 5), got "
|
|
+ str(median_speed)
|
|
)
|
|
|
|
finally:
|
|
for node in [node1, node2]:
|
|
node.query("DROP TABLE IF EXISTS limited_fetch_table SYNC")
|
|
|
|
|
|
def test_limited_send_single_table(start_cluster):
|
|
print("Limited sends single table")
|
|
try:
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
f"CREATE TABLE limited_send_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetch_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_sends_network_bandwidth=5242880"
|
|
)
|
|
|
|
node2.query("SYSTEM STOP FETCHES limited_send_table")
|
|
|
|
for i in range(5):
|
|
node1.query(
|
|
"INSERT INTO limited_send_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
|
|
i
|
|
)
|
|
)
|
|
|
|
n1_net = NetThroughput(node1)
|
|
n2_net = NetThroughput(node2)
|
|
|
|
node2.query("SYSTEM START FETCHES limited_send_table")
|
|
n1_sends_speed = []
|
|
for i in range(10):
|
|
n1_in, n1_out = n1_net.measure_speed("megabytes")
|
|
n2_in, n2_out = n2_net.measure_speed("megabytes")
|
|
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
|
|
print("[N2] input:", n2_in, "MB/s", "output:", n2_out, "MB/s")
|
|
n1_sends_speed.append(n1_out)
|
|
time.sleep(0.5)
|
|
|
|
median_speed = statistics.median(n1_sends_speed)
|
|
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
|
|
assert median_speed <= 10, (
|
|
"We exceeded max send speed for more than 5MB/s. Must be around 5 (+- 5), got "
|
|
+ str(median_speed)
|
|
)
|
|
|
|
finally:
|
|
for node in [node1, node2]:
|
|
node.query("DROP TABLE IF EXISTS limited_send_table SYNC")
|
|
|
|
|
|
def test_limited_fetches_for_server(start_cluster):
|
|
print("Limited fetches for server")
|
|
try:
|
|
for i, node in enumerate([node1, node3]):
|
|
for j in range(5):
|
|
node.query(
|
|
f"CREATE TABLE limited_fetches{j}(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_fetches{j}', '{i}') ORDER BY tuple() PARTITION BY key"
|
|
)
|
|
|
|
for j in range(5):
|
|
node3.query(f"SYSTEM STOP FETCHES limited_fetches{j}")
|
|
for i in range(5):
|
|
node1.query(
|
|
"INSERT INTO limited_fetches{} SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
|
|
j, i
|
|
)
|
|
)
|
|
|
|
n1_net = NetThroughput(node1)
|
|
n3_net = NetThroughput(node3)
|
|
|
|
for j in range(5):
|
|
node3.query(f"SYSTEM START FETCHES limited_fetches{j}")
|
|
|
|
n3_fetches_speed = []
|
|
for i in range(5):
|
|
n1_in, n1_out = n1_net.measure_speed("megabytes")
|
|
n3_in, n3_out = n3_net.measure_speed("megabytes")
|
|
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
|
|
print("[N3] input:", n3_in, "MB/s", "output:", n3_out, "MB/s")
|
|
n3_fetches_speed.append(n3_in)
|
|
time.sleep(0.5)
|
|
|
|
median_speed = statistics.median(n3_fetches_speed)
|
|
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
|
|
assert median_speed <= 15, (
|
|
"We exceeded max fetch speed for more than 15MB/s. Must be around 5 (+- 10), got "
|
|
+ str(median_speed)
|
|
)
|
|
|
|
finally:
|
|
for node in [node1, node3]:
|
|
for j in range(5):
|
|
node.query(f"DROP TABLE IF EXISTS limited_fetches{j} SYNC")
|
|
|
|
|
|
def test_limited_sends_for_server(start_cluster):
|
|
print("Limited sends for server")
|
|
try:
|
|
for i, node in enumerate([node1, node3]):
|
|
for j in range(5):
|
|
node.query(
|
|
f"CREATE TABLE limited_sends{j}(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/limited_sends{j}', '{i}') ORDER BY tuple() PARTITION BY key"
|
|
)
|
|
|
|
for j in range(5):
|
|
node1.query(f"SYSTEM STOP FETCHES limited_sends{j}")
|
|
for i in range(5):
|
|
node3.query(
|
|
"INSERT INTO limited_sends{} SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(150)".format(
|
|
j, i
|
|
)
|
|
)
|
|
|
|
n1_net = NetThroughput(node1)
|
|
n3_net = NetThroughput(node3)
|
|
|
|
for j in range(5):
|
|
node1.query(f"SYSTEM START FETCHES limited_sends{j}")
|
|
|
|
n3_sends_speed = []
|
|
for i in range(5):
|
|
n1_in, n1_out = n1_net.measure_speed("megabytes")
|
|
n3_in, n3_out = n3_net.measure_speed("megabytes")
|
|
print("[N1] input:", n1_in, "MB/s", "output:", n1_out, "MB/s")
|
|
print("[N3] input:", n3_in, "MB/s", "output:", n3_out, "MB/s")
|
|
n3_sends_speed.append(n3_out)
|
|
time.sleep(0.5)
|
|
|
|
median_speed = statistics.median(n3_sends_speed)
|
|
# approximate border. Without limit we will have more than 100 MB/s for very slow builds.
|
|
assert median_speed <= 20, (
|
|
"We exceeded max send speed for more than 20MB/s. Must be around 5 (+- 10), got "
|
|
+ str(median_speed)
|
|
)
|
|
|
|
finally:
|
|
for node in [node1, node3]:
|
|
for j in range(5):
|
|
node.query(f"DROP TABLE IF EXISTS limited_sends{j} SYNC")
|
|
|
|
|
|
def test_should_execute_fetch(start_cluster):
|
|
print("Should execute fetch")
|
|
try:
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
f"CREATE TABLE should_execute_table(key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/should_execute_table', '{i}') ORDER BY tuple() PARTITION BY key SETTINGS max_replicated_fetches_network_bandwidth=3505253"
|
|
)
|
|
|
|
node2.query("SYSTEM STOP FETCHES should_execute_table")
|
|
|
|
for i in range(3):
|
|
node1.query(
|
|
"INSERT INTO should_execute_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(200)".format(
|
|
i
|
|
)
|
|
)
|
|
|
|
n1_net = NetThroughput(node1)
|
|
n2_net = NetThroughput(node2)
|
|
|
|
node2.query("SYSTEM START FETCHES should_execute_table")
|
|
|
|
for i in range(10):
|
|
node1.query(
|
|
"INSERT INTO should_execute_table SELECT {}, (select randomPrintableASCII(104857)) FROM numbers(3)".format(
|
|
i
|
|
)
|
|
)
|
|
|
|
n2_fetch_speed = []
|
|
replication_queue_data = []
|
|
for i in range(10):
|
|
n1_in, n1_out = n1_net.measure_speed("megabytes")
|
|
n2_in, n2_out = n2_net.measure_speed("megabytes")
|
|
fetches_count = node2.query("SELECT count() FROM system.replicated_fetches")
|
|
if fetches_count == "0\n":
|
|
break
|
|
|
|
print("Fetches count", fetches_count)
|
|
replication_queue_data.append(
|
|
node2.query(
|
|
"SELECT count() FROM system.replication_queue WHERE postpone_reason like '%fetches have already throttled%'"
|
|
)
|
|
)
|
|
n2_fetch_speed.append(n2_in)
|
|
time.sleep(0.5)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA should_execute_table")
|
|
assert any(int(f.strip()) != 0 for f in replication_queue_data)
|
|
assert node2.query("SELECT COUNT() FROM should_execute_table") == "630\n"
|
|
finally:
|
|
for node in [node1, node2]:
|
|
node.query("DROP TABLE IF EXISTS should_execute_table SYNC")
|