mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
import threading
|
|
import time
|
|
from helpers.client import QueryRuntimeException
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/config_default.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
node2 = cluster.add_instance(
|
|
"node2",
|
|
main_configs=["configs/config_defined_50.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
node3 = cluster.add_instance(
|
|
"node3",
|
|
main_configs=["configs/config_defined_1.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
node4 = cluster.add_instance(
|
|
"node4",
|
|
main_configs=["configs/config_limit_reached.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_concurrent_threads_soft_limit_default(started_cluster):
|
|
node1.query(
|
|
"SELECT count(*) FROM numbers_mt(10000000)",
|
|
query_id="test_concurrent_threads_soft_limit_1",
|
|
)
|
|
node1.query("SYSTEM FLUSH LOGS")
|
|
assert (
|
|
node1.query(
|
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
|
|
)
|
|
== "102\n"
|
|
)
|
|
|
|
|
|
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
|
|
node2.query(
|
|
"SELECT count(*) FROM numbers_mt(10000000)",
|
|
query_id="test_concurrent_threads_soft_limit_2",
|
|
)
|
|
node2.query("SYSTEM FLUSH LOGS")
|
|
assert (
|
|
node2.query(
|
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
|
|
)
|
|
== "52\n"
|
|
)
|
|
|
|
|
|
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
|
|
node3.query(
|
|
"SELECT count(*) FROM numbers_mt(10000000)",
|
|
query_id="test_concurrent_threads_soft_limit_3",
|
|
)
|
|
node3.query("SYSTEM FLUSH LOGS")
|
|
assert (
|
|
node3.query(
|
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
|
|
)
|
|
== "3\n"
|
|
)
|
|
|
|
|
|
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
|
|
# Background query starts in a separate thread to reach this limit.
|
|
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
|
|
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
|
|
def background_query():
|
|
try:
|
|
node4.query(
|
|
"SELECT count(*) FROM numbers_mt(1e11) settings max_threads=100",
|
|
query_id="background_query",
|
|
)
|
|
except QueryRuntimeException:
|
|
pass
|
|
|
|
background_thread = threading.Thread(target=background_query)
|
|
background_thread.start()
|
|
|
|
def limit_reached():
|
|
s_count = node4.query(
|
|
"SELECT sum(length(thread_ids)) FROM system.processes"
|
|
).strip()
|
|
if s_count:
|
|
count = int(s_count)
|
|
else:
|
|
count = 0
|
|
return count >= 10
|
|
|
|
while not limit_reached():
|
|
time.sleep(0.1)
|
|
|
|
node4.query(
|
|
"SELECT count(*) FROM numbers_mt(10000000) settings max_threads=5",
|
|
query_id="test_concurrent_threads_soft_limit_4",
|
|
)
|
|
|
|
node4.query("SYSTEM FLUSH LOGS")
|
|
s_count = node4.query(
|
|
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
|
|
).strip()
|
|
if s_count:
|
|
count = int(s_count)
|
|
else:
|
|
count = 0
|
|
assert count < 5
|
|
node4.query("KILL QUERY WHERE query_id = 'background_query' SYNC")
|
|
background_thread.join()
|