From 5bc5649732ab3c94ae6dcfe4d2caf5f58bf51502 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Fri, 11 Aug 2023 05:39:13 +0000 Subject: [PATCH 1/4] added an integration test for peak_memory_usage --- .../test_peak_memory_usage/test.py | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 tests/integration/test_peak_memory_usage/test.py diff --git a/tests/integration/test_peak_memory_usage/test.py b/tests/integration/test_peak_memory_usage/test.py new file mode 100644 index 00000000000..502415d9aae --- /dev/null +++ b/tests/integration/test_peak_memory_usage/test.py @@ -0,0 +1,133 @@ +import pytest +import tempfile +import re + +from helpers.cluster import ClickHouseCluster +from helpers.uclient import client, prompt + +cluster = ClickHouseCluster(__file__) + +shard_1 = cluster.add_instance( + "shard_1", + main_configs=["configs/remote_servers.xml"], + with_zookeeper=True, + macros={ + "shard": "shard_1", + }, +) +shard_2 = cluster.add_instance( + "shard_2", + main_configs=["configs/remote_servers.xml"], + with_zookeeper=True, + macros={ + "shard": "shard_2", + }, +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + shard_1.query( + "CREATE TABLE fixed_numbers ON CLUSTER 'cluster' (" + "number UInt64" + ") ENGINE=MergeTree()" + "ORDER BY number" + ) + + shard_1.query( + "CREATE TABLE distributed_fixed_numbers (number UInt64) ENGINE=Distributed('cluster', 'default', 'fixed_numbers')" + ) + + # Shard 1 has singificantly less data then shard 2 + shard_1.query( + "INSERT INTO fixed_numbers SELECT number FROM numbers(1999900, 2000000)" + ) + + shard_2.query( + "INSERT INTO fixed_numbers SELECT number FROM numbers(0, 1999900)" + ) + + yield cluster + finally: + cluster.shutdown() + + +def get_memory_usage_from_client_output_and_close(client_output): + client_output.seek(0) + peek_memory_usage_str_found = False + for line in client_output: + print(f"'{line}'\n") + if not peek_memory_usage_str_found: + peek_memory_usage_str_found = "Peak memory usage" in line + + if peek_memory_usage_str_found: + search_obj = re.search(r"([-+]?(?:\d*\.*\d+))", line) + if search_obj: + client_output.close() + return search_obj.group() + + client_output.close() + return "" + + +def test_clickhouse_client_max_peak_memory_usage_distributed(started_cluster): + client_output = tempfile.TemporaryFile(mode="w+t") + command_text = ( + f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000" + ) + with client(name="client1>", log=client_output, command=command_text) as client1: + client1.expect(prompt) + client1.send( + "SELECT COUNT(*) FROM distributed_fixed_numbers WHERE number IN (SELECT number from numbers(1999890, 1999910))" + ) + client1.expect("Peak memory usage") + client1.expect(prompt) + + peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output) + assert peak_memory_usage + assert shard_2.contains_in_log( + f"Peak memory usage (for query): {peak_memory_usage}" + ) + +def test_clickhouse_client_max_peak_memory_usage_cluster(started_cluster): + client_output = tempfile.TemporaryFile(mode="w+t") + command_text = ( + f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000" + ) + with client(name="client1>", log=client_output, command=command_text) as client1: + client1.expect(prompt) + client1.send( + "SELECT COUNT(*) FROM (SELECT number FROM numbers(1,100000) INTERSECT SELECT * FROM clusterAllReplicas(cluster, default, fixed_numbers))" + ) + client1.expect("Peak memory usage") + client1.expect(prompt) + + peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output) + assert peak_memory_usage + assert shard_2.contains_in_log( + f"Peak memory usage (for query): {peak_memory_usage}" + ) + + +def test_clickhouse_client_max_peak_memory_single_node(started_cluster): + client_output = tempfile.TemporaryFile(mode="w+t") + + command_text = ( + f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000" + ) + with client(name="client1>", log=client_output, command=command_text) as client1: + client1.expect(prompt) + client1.send( + "SELECT COUNT(*) FROM (SELECT number FROM numbers(1,300000) INTERSECT SELECT number FROM numbers(10000,1200000))" + ) + client1.expect("Peak memory usage") + client1.expect(prompt) + + peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output) + assert peak_memory_usage + assert shard_1.contains_in_log( + f"Peak memory usage (for query): {peak_memory_usage}" + ) From b7c47af8bfc4c1d488deec21bfeb95c145ecc293 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Fri, 11 Aug 2023 06:18:20 +0000 Subject: [PATCH 2/4] added missed files --- .../test_peak_memory_usage/__init__.py | 0 .../configs/remote_servers.xml | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 tests/integration/test_peak_memory_usage/__init__.py create mode 100644 tests/integration/test_peak_memory_usage/configs/remote_servers.xml diff --git a/tests/integration/test_peak_memory_usage/__init__.py b/tests/integration/test_peak_memory_usage/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_peak_memory_usage/configs/remote_servers.xml b/tests/integration/test_peak_memory_usage/configs/remote_servers.xml new file mode 100644 index 00000000000..b137758489e --- /dev/null +++ b/tests/integration/test_peak_memory_usage/configs/remote_servers.xml @@ -0,0 +1,20 @@ + + + + + 1 + + shard_1 + 9000 + + + + 3 + + shard_2 + 9000 + + + + + From 820d821f50c80992ff717c95dff6cf3460e3bd6f Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Fri, 11 Aug 2023 06:30:48 +0000 Subject: [PATCH 3/4] black run --- tests/integration/test_peak_memory_usage/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_peak_memory_usage/test.py b/tests/integration/test_peak_memory_usage/test.py index 502415d9aae..3994ee66a4a 100644 --- a/tests/integration/test_peak_memory_usage/test.py +++ b/tests/integration/test_peak_memory_usage/test.py @@ -92,6 +92,7 @@ def test_clickhouse_client_max_peak_memory_usage_distributed(started_cluster): f"Peak memory usage (for query): {peak_memory_usage}" ) + def test_clickhouse_client_max_peak_memory_usage_cluster(started_cluster): client_output = tempfile.TemporaryFile(mode="w+t") command_text = ( From 930b3d3bffc304d3fb9ff031f078fb6ffb862840 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Fri, 11 Aug 2023 23:48:17 +0000 Subject: [PATCH 4/4] Improved test --- .../test_peak_memory_usage/test.py | 46 +++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/tests/integration/test_peak_memory_usage/test.py b/tests/integration/test_peak_memory_usage/test.py index 3994ee66a4a..a1313461482 100644 --- a/tests/integration/test_peak_memory_usage/test.py +++ b/tests/integration/test_peak_memory_usage/test.py @@ -37,17 +37,23 @@ def started_cluster(): "ORDER BY number" ) + shard_1.query( + "CREATE TABLE fixed_numbers_2 ON CLUSTER 'cluster' (" + "number UInt64" + ") ENGINE=Memory ()" + ) + shard_1.query( "CREATE TABLE distributed_fixed_numbers (number UInt64) ENGINE=Distributed('cluster', 'default', 'fixed_numbers')" ) + shard_1.query("INSERT INTO fixed_numbers SELECT number FROM numbers(0, 100)") - # Shard 1 has singificantly less data then shard 2 - shard_1.query( - "INSERT INTO fixed_numbers SELECT number FROM numbers(1999900, 2000000)" - ) + shard_2.query("INSERT INTO fixed_numbers SELECT number FROM numbers(100, 200)") + + shard_1.query("INSERT INTO fixed_numbers_2 SELECT number FROM numbers(0, 10)") shard_2.query( - "INSERT INTO fixed_numbers SELECT number FROM numbers(0, 1999900)" + "INSERT INTO fixed_numbers_2 SELECT number FROM numbers(0, 120000)" ) yield cluster @@ -64,11 +70,13 @@ def get_memory_usage_from_client_output_and_close(client_output): peek_memory_usage_str_found = "Peak memory usage" in line if peek_memory_usage_str_found: - search_obj = re.search(r"([-+]?(?:\d*\.*\d+))", line) + search_obj = re.search(r"[+-]?[0-9]+\.[0-9]+", line) if search_obj: client_output.close() + print(f"peak_memory_usage {search_obj.group()}") return search_obj.group() + print(f"peak_memory_usage not found") client_output.close() return "" @@ -81,29 +89,9 @@ def test_clickhouse_client_max_peak_memory_usage_distributed(started_cluster): with client(name="client1>", log=client_output, command=command_text) as client1: client1.expect(prompt) client1.send( - "SELECT COUNT(*) FROM distributed_fixed_numbers WHERE number IN (SELECT number from numbers(1999890, 1999910))" + "SELECT COUNT(*) FROM distributed_fixed_numbers JOIN fixed_numbers_2 ON distributed_fixed_numbers.number=fixed_numbers_2.number", ) - client1.expect("Peak memory usage") - client1.expect(prompt) - - peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output) - assert peak_memory_usage - assert shard_2.contains_in_log( - f"Peak memory usage (for query): {peak_memory_usage}" - ) - - -def test_clickhouse_client_max_peak_memory_usage_cluster(started_cluster): - client_output = tempfile.TemporaryFile(mode="w+t") - command_text = ( - f"{started_cluster.get_client_cmd()} --host {shard_1.ip_address} --port 9000" - ) - with client(name="client1>", log=client_output, command=command_text) as client1: - client1.expect(prompt) - client1.send( - "SELECT COUNT(*) FROM (SELECT number FROM numbers(1,100000) INTERSECT SELECT * FROM clusterAllReplicas(cluster, default, fixed_numbers))" - ) - client1.expect("Peak memory usage") + client1.expect("Peak memory usage", timeout=60) client1.expect(prompt) peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output) @@ -124,7 +112,7 @@ def test_clickhouse_client_max_peak_memory_single_node(started_cluster): client1.send( "SELECT COUNT(*) FROM (SELECT number FROM numbers(1,300000) INTERSECT SELECT number FROM numbers(10000,1200000))" ) - client1.expect("Peak memory usage") + client1.expect("Peak memory usage", timeout=60) client1.expect(prompt) peak_memory_usage = get_memory_usage_from_client_output_and_close(client_output)