ClickHouse/tests/integration/test_scheduler/test.py
Azat Khuzhin 69d23f5e67 Fix all problems in tests that had been found by flake8
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2024-06-05 14:46:38 +02:00

114 lines
2.9 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import time
import threading
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=["configs/scheduler.xml"],
with_minio=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield
finally:
cluster.shutdown()
def test_s3_disk():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
def write_query(workload):
try:
node.query(
f"insert into data select * from numbers(1e5) settings workload='{workload}'"
)
except QueryRuntimeException:
pass
thread1 = threading.Thread(target=write_query, args=["development"])
thread2 = threading.Thread(target=write_query, args=["production"])
thread3 = threading.Thread(target=write_query, args=["admin"])
thread1.start()
thread2.start()
thread3.start()
thread3.join()
thread2.join()
thread1.join()
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/admin'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/dev'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/prod'"
)
== "1\n"
)
def read_query(workload):
try:
node.query(f"select sum(key*key) from data settings workload='{workload}'")
except QueryRuntimeException:
pass
thread1 = threading.Thread(target=read_query, args=["development"])
thread2 = threading.Thread(target=read_query, args=["production"])
thread3 = threading.Thread(target=read_query, args=["admin"])
thread1.start()
thread2.start()
thread3.start()
thread3.join()
thread2.join()
thread1.join()
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/admin'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/dev'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/prod'"
)
== "1\n"
)