mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 03:42:48 +00:00
113 lines
2.9 KiB
Python
113 lines
2.9 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
import time
|
|
import threading
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node = cluster.add_instance(
|
|
"node",
|
|
stay_alive=True,
|
|
main_configs=["configs/scheduler.xml"],
|
|
with_minio=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_s3_disk():
|
|
node.query(
|
|
f"""
|
|
drop table if exists data;
|
|
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
|
|
"""
|
|
)
|
|
|
|
def write_query(workload):
|
|
try:
|
|
node.query(
|
|
f"insert into data select * from numbers(1e5) settings workload='{workload}'"
|
|
)
|
|
except QueryRuntimeException:
|
|
pass
|
|
|
|
thread1 = threading.Thread(target=write_query, args=["development"])
|
|
thread2 = threading.Thread(target=write_query, args=["production"])
|
|
thread3 = threading.Thread(target=write_query, args=["admin"])
|
|
|
|
thread1.start()
|
|
thread2.start()
|
|
thread3.start()
|
|
|
|
thread3.join()
|
|
thread2.join()
|
|
thread1.join()
|
|
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/admin'"
|
|
)
|
|
== "1\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/dev'"
|
|
)
|
|
== "1\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/prod'"
|
|
)
|
|
== "1\n"
|
|
)
|
|
|
|
def read_query(workload):
|
|
try:
|
|
node.query(f"select sum(key*key) from data settings workload='{workload}'")
|
|
except QueryRuntimeException:
|
|
pass
|
|
|
|
thread1 = threading.Thread(target=read_query, args=["development"])
|
|
thread2 = threading.Thread(target=read_query, args=["production"])
|
|
thread3 = threading.Thread(target=read_query, args=["admin"])
|
|
|
|
thread1.start()
|
|
thread2.start()
|
|
thread3.start()
|
|
|
|
thread3.join()
|
|
thread2.join()
|
|
thread1.join()
|
|
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/admin'"
|
|
)
|
|
== "1\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/dev'"
|
|
)
|
|
== "1\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/prod'"
|
|
)
|
|
== "1\n"
|
|
)
|