ClickHouse/tests/integration/test_concurrent_threads_soft_limit/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

297 lines
8.6 KiB
Python
Raw Normal View History

import pytest
from helpers.cluster import ClickHouseCluster
import threading
import time
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config_default.xml"],
user_configs=["configs/users.xml"],
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/config_defined_50.xml"],
user_configs=["configs/users.xml"],
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/config_defined_1.xml"],
user_configs=["configs/users.xml"],
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/config_limit_reached.xml"],
user_configs=["configs/users.xml"],
)
def assert_profile_event(node, query_id, profile_event, check):
2024-07-12 12:23:30 +00:00
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
2024-07-12 12:23:30 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_concurrent_threads_soft_limit_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_1",
)
node1.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
)
== "102\n"
)
def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
node1.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantedHard",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2",
)
node2.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
)
== "52\n"
)
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
node2.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantedHard",
lambda x: x == 100,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlGrantDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 100,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3",
)
node3.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlGrantDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAcquiredTotal",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
)
== "3\n"
)
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
node4.query(
"SELECT count(*) FROM numbers_mt(1e11) settings max_threads=100",
query_id="background_query",
)
except QueryRuntimeException:
pass
background_thread = threading.Thread(target=background_query)
background_thread.start()
def limit_reached():
s_count = node4.query(
"SELECT sum(length(thread_ids)) FROM system.processes"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
return count >= 10
while not limit_reached():
time.sleep(0.1)
node4.query(
"SELECT count(*) FROM numbers_mt(10000000) settings max_threads=5",
query_id="test_concurrent_threads_soft_limit_4",
)
node4.query("SYSTEM FLUSH LOGS")
2024-07-12 12:23:30 +00:00
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantedHard",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlGrantDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAcquiredTotal",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlAllocationDelayed",
lambda x: x == 1,
)
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
assert count < 5
node4.query("KILL QUERY WHERE query_id = 'background_query' SYNC")
background_thread.join()