# pylint: disable=unused-argument # pylint: disable=redefined-outer-name # pylint: disable=line-too-long import time import threading import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node = cluster.add_instance( "node", stay_alive=True, main_configs=["configs/scheduler.xml"], with_minio=True, ) @pytest.fixture(scope="module", autouse=True) def start_cluster(): try: cluster.start() yield finally: cluster.shutdown() def test_s3_disk(): node.query( f""" drop table if exists data; create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3'; """ ) def write_query(workload): try: node.query( f"insert into data select * from numbers(1e5) settings workload='{workload}'" ) except QueryRuntimeException: pass thread1 = threading.Thread(target=write_query, args=["development"]) thread2 = threading.Thread(target=write_query, args=["production"]) thread3 = threading.Thread(target=write_query, args=["admin"]) thread1.start() thread2.start() thread3.start() thread3.join() thread2.join() thread1.join() assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/admin'" ) == "1\n" ) assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/dev'" ) == "1\n" ) assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/prod'" ) == "1\n" ) def read_query(workload): try: node.query(f"select sum(key*key) from data settings workload='{workload}'") except QueryRuntimeException: pass thread1 = threading.Thread(target=read_query, args=["development"]) thread2 = threading.Thread(target=read_query, args=["production"]) thread3 = threading.Thread(target=read_query, args=["admin"]) thread1.start() thread2.start() thread3.start() thread3.join() thread2.join() thread1.join() assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/admin'" ) == "1\n" ) assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/dev'" ) == "1\n" ) assert ( node.query( f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/prod'" ) == "1\n" )