ClickHouse/tests/integration/test_peak_memory_usage/test.py
2024-11-27 18:05:07 +01:00

121 lines
3.9 KiB
Python

import re
import tempfile
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.uclient import client, prompt
cluster = ClickHouseCluster(__file__)
shard_1 = cluster.add_instance(
"shard_1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
macros={
"shard": "shard_1",
},
)
shard_2 = cluster.add_instance(
"shard_2",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
macros={
"shard": "shard_2",
},
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
shard_1.query(
"CREATE TABLE fixed_numbers ON CLUSTER 'cluster' ("
"number UInt64"
") ENGINE=MergeTree()"
"ORDER BY number"
)
shard_1.query(
"CREATE TABLE fixed_numbers_2 ON CLUSTER 'cluster' ("
"number UInt64"
") ENGINE=Memory ()"
)
shard_1.query(
"CREATE TABLE distributed_fixed_numbers (number UInt64) ENGINE=Distributed('cluster', 'default', 'fixed_numbers')"
)
shard_1.query("INSERT INTO fixed_numbers SELECT number FROM numbers(0, 100)")
shard_2.query("INSERT INTO fixed_numbers SELECT number FROM numbers(100, 200)")
shard_1.query("INSERT INTO fixed_numbers_2 SELECT number FROM numbers(0, 10)")
shard_2.query(
"INSERT INTO fixed_numbers_2 SELECT number FROM numbers(0, 120000)"
)
yield cluster
finally:
cluster.shutdown()
def get_memory_usage_from_client_output_and_close(client_output):
client_output.seek(0)
peek_memory_usage_str_found = False
for line in client_output:
print(f"'{line}'\n")
if not peek_memory_usage_str_found:
# Can be both Peak/peak
peek_memory_usage_str_found = "eak memory usage" in line
if peek_memory_usage_str_found:
search_obj = re.search(r"[+-]?[0-9]+\.[0-9]+", line)
if search_obj:
client_output.close()
print(f"peak_memory_usage {search_obj.group()}")
return search_obj.group()
print(f"peak_memory_usage not found")
client_output.close()
return ""
def test_clickhouse_client_max_peak_memory_usage_distributed(started_cluster):
client_output = tempfile.TemporaryFile(mode="w+t")
command_text = (
f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000"
)
with client(name="client1>", log=client_output, command=command_text) as client1:
client1.expect(prompt)
client1.send(
"SELECT COUNT(*) FROM distributed_fixed_numbers JOIN fixed_numbers_2 ON distributed_fixed_numbers.number=fixed_numbers_2.number SETTINGS query_plan_join_swap_table = 'false', join_algorithm='hash'",
)
client1.expect("Peak memory usage", timeout=60)
client1.expect(prompt)
peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output)
assert peak_memory_usage
assert shard_2.contains_in_log(f"Query peak memory usage: {peak_memory_usage}")
def test_clickhouse_client_max_peak_memory_single_node(started_cluster):
client_output = tempfile.TemporaryFile(mode="w+t")
command_text = (
f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000"
)
with client(name="client1>", log=client_output, command=command_text) as client1:
client1.expect(prompt)
client1.send(
"SELECT COUNT(*) FROM (SELECT number FROM numbers(1,300000) INTERSECT SELECT number FROM numbers(10000,1200000))"
)
client1.expect("Peak memory usage", timeout=60)
client1.expect(prompt)
peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output)
assert peak_memory_usage
assert shard_1.contains_in_log(f"Query peak memory usage: {peak_memory_usage}")