ClickHouse/tests/integration/test_concurrent_threads_soft_limit/test.py
2024-10-18 18:11:33 +00:00

301 lines
8.9 KiB
Python

import threading
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config_default.xml"],
user_configs=["configs/users.xml"],
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/config_defined_50.xml"],
user_configs=["configs/users.xml"],
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/config_defined_1.xml"],
user_configs=["configs/users.xml"],
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/config_limit_reached.xml"],
user_configs=["configs/users.xml"],
)
def assert_profile_event(node, query_id, profile_event, check):
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_concurrent_threads_soft_limit_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_1",
)
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
)
== "102\n"
)
def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
# Concurrency control is not used, all metrics should be zeros
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsGranted",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2",
)
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
)
== "52\n"
)
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
# Concurrency control is not used, all metrics should be zeros
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsGranted",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3",
)
node3.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
)
== "3\n"
)
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
node4.query(
"SELECT count(*) FROM numbers_mt(1e11) settings max_threads=100",
query_id="background_query",
)
except QueryRuntimeException:
pass
background_thread = threading.Thread(target=background_query)
background_thread.start()
def limit_reached():
s_count = node4.query(
"SELECT sum(length(thread_ids)) FROM system.processes"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
return count >= 10
while not limit_reached():
time.sleep(0.1)
node4.query(
"SELECT count(*) FROM numbers_mt(10000000) settings max_threads=5",
query_id="test_concurrent_threads_soft_limit_4",
)
node4.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsAcquired",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
assert count < 5
node4.query("KILL QUERY WHERE query_id = 'background_query' SYNC")
background_thread.join()