ClickHouse/tests/integration/test_concurrent_threads_soft_limit/test.py

124 lines
3.9 KiB
Python

import pytest
from helpers.cluster import ClickHouseCluster
import threading
import time
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config_default.xml"],
user_configs=["configs/users.xml"],
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/config_defined_50.xml"],
user_configs=["configs/users.xml"],
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/config_defined_1.xml"],
user_configs=["configs/users.xml"],
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/config_limit_reached.xml"],
user_configs=["configs/users.xml"],
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_concurrent_threads_soft_limit_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000)", query_id="test_concurrent_threads_soft_limit_1"
)
node1.query("SYSTEM FLUSH LOGS")
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
)
== "102\n"
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)", query_id="test_concurrent_threads_soft_limit_2"
)
node2.query("SYSTEM FLUSH LOGS")
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
)
== "52\n"
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)", query_id="test_concurrent_threads_soft_limit_3"
)
node3.query("SYSTEM FLUSH LOGS")
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
)
== "3\n"
)
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
node4.query(
"SELECT count(*) FROM numbers_mt(1e11) settings max_threads=100",
query_id="background_query",
)
except QueryRuntimeException:
pass
background_thread = threading.Thread(target=background_query)
background_thread.start()
def limit_reached():
s_count = node4.query(
"SELECT sum(length(thread_ids)) FROM system.processes"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
return count >= 10
while not limit_reached():
time.sleep(0.1)
node4.query(
"SELECT count(*) FROM numbers_mt(10000000) settings max_threads=5",
query_id="test_concurrent_threads_soft_limit_4",
)
node4.query("SYSTEM FLUSH LOGS")
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
).strip()
if s_count:
count = int(s_count)
else:
count = 0
assert count < 5
node4.query("KILL QUERY WHERE query_id = 'background_query' SYNC")
background_thread.join()