From 77d2e6d3d82ae38a661755e052055bffc344b8f7 Mon Sep 17 00:00:00 2001 From: serxa Date: Wed, 22 May 2024 14:38:21 +0000 Subject: [PATCH] add tests for merge_workload and mutation_workload settings --- .../System/StorageSystemServerSettings.cpp | 5 +- .../test_scheduler/configs/resources.xml | 3 + .../configs/resources.xml.default | 76 +++++++ .../test_scheduler/configs/scheduler.xml | 62 ----- .../configs/storage_configuration.xml | 26 +++ .../test_scheduler/configs/workloads.xml | 3 + .../configs/workloads.xml.default | 4 + tests/integration/test_scheduler/test.py | 215 +++++++++++++++++- 8 files changed, 330 insertions(+), 64 deletions(-) create mode 100644 tests/integration/test_scheduler/configs/resources.xml create mode 100644 tests/integration/test_scheduler/configs/resources.xml.default delete mode 100644 tests/integration/test_scheduler/configs/scheduler.xml create mode 100644 tests/integration/test_scheduler/configs/storage_configuration.xml create mode 100644 tests/integration/test_scheduler/configs/workloads.xml create mode 100644 tests/integration/test_scheduler/configs/workloads.xml.default diff --git a/src/Storages/System/StorageSystemServerSettings.cpp b/src/Storages/System/StorageSystemServerSettings.cpp index 2e848f68850..ef10b2f45da 100644 --- a/src/Storages/System/StorageSystemServerSettings.cpp +++ b/src/Storages/System/StorageSystemServerSettings.cpp @@ -81,7 +81,10 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context {"uncompressed_cache_size", {std::to_string(context->getUncompressedCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}}, {"index_mark_cache_size", {std::to_string(context->getIndexMarkCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}}, {"index_uncompressed_cache_size", {std::to_string(context->getIndexUncompressedCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}}, - {"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}} + {"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}}, + + {"merge_workload", {context->getMergeWorkload(), ChangeableWithoutRestart::Yes}}, + {"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}} }; if (context->areBackgroundExecutorsInitialized()) diff --git a/tests/integration/test_scheduler/configs/resources.xml b/tests/integration/test_scheduler/configs/resources.xml new file mode 100644 index 00000000000..197bf660500 --- /dev/null +++ b/tests/integration/test_scheduler/configs/resources.xml @@ -0,0 +1,3 @@ + + + diff --git a/tests/integration/test_scheduler/configs/resources.xml.default b/tests/integration/test_scheduler/configs/resources.xml.default new file mode 100644 index 00000000000..3b003a17557 --- /dev/null +++ b/tests/integration/test_scheduler/configs/resources.xml.default @@ -0,0 +1,76 @@ + + + + inflight_limit1000000 + priority + fifo0 + fair1 + fifo9 + fifo1 + fair90 + fifo + fifo + fifo9 + fifo9 + fifo9 + fifo9 + + + inflight_limit1000000 + priority + fifo0 + fair1 + fifo9 + fifo1 + fair90 + fifo + fifo + fifo9 + fifo9 + fifo9 + fifo9 + + + + + /prio/admin + /prio/admin + + + /prio/fair/prod + /prio/fair/prod + + + /prio/fair/dev + /prio/fair/dev + + + /prio/fair/dev + /prio/fair/dev + + + /prio/fair/sys/merges + /prio/fair/sys/merges + + + /prio/fair/sys/mutations + /prio/fair/sys/mutations + + + /prio/fair/prod_merges + /prio/fair/prod_merges + + + /prio/fair/prod_mutations + /prio/fair/prod_mutations + + + /prio/fair/dev_merges + /prio/fair/dev_merges + + + /prio/fair/dev_mutations + /prio/fair/dev_mutations + + + diff --git a/tests/integration/test_scheduler/configs/scheduler.xml b/tests/integration/test_scheduler/configs/scheduler.xml deleted file mode 100644 index 523ba1a5a98..00000000000 --- a/tests/integration/test_scheduler/configs/scheduler.xml +++ /dev/null @@ -1,62 +0,0 @@ - - - - - s3 - http://minio1:9001/root/data/ - minio - minio123 - 33554432 - 10 - 10 - network_read - network_write - - - - - -
- s3 -
-
-
-
-
- - - inflight_limit1000000 - priority - fifo0 - fair1 - fifo9 - fifo1 - - - inflight_limit1000000 - priority - fifo0 - fair1 - fifo9 - fifo1 - - - - - /prio/admin - /prio/admin - - - /prio/fair/prod - /prio/fair/prod - - - /prio/fair/dev - /prio/fair/dev - - - /prio/fair/dev - /prio/fair/dev - - -
diff --git a/tests/integration/test_scheduler/configs/storage_configuration.xml b/tests/integration/test_scheduler/configs/storage_configuration.xml new file mode 100644 index 00000000000..823a00a05de --- /dev/null +++ b/tests/integration/test_scheduler/configs/storage_configuration.xml @@ -0,0 +1,26 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + 33554432 + 10 + 10 + network_read + network_write + + + + + +
+ s3 +
+
+
+
+
+
diff --git a/tests/integration/test_scheduler/configs/workloads.xml b/tests/integration/test_scheduler/configs/workloads.xml new file mode 100644 index 00000000000..197bf660500 --- /dev/null +++ b/tests/integration/test_scheduler/configs/workloads.xml @@ -0,0 +1,3 @@ + + + diff --git a/tests/integration/test_scheduler/configs/workloads.xml.default b/tests/integration/test_scheduler/configs/workloads.xml.default new file mode 100644 index 00000000000..f010993335d --- /dev/null +++ b/tests/integration/test_scheduler/configs/workloads.xml.default @@ -0,0 +1,4 @@ + + sys_merges + sys_mutations + diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index e6def99c076..e0660c03681 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -13,7 +13,13 @@ cluster = ClickHouseCluster(__file__) node = cluster.add_instance( "node", stay_alive=True, - main_configs=["configs/scheduler.xml"], + main_configs=[ + "configs/storage_configuration.xml", + "configs/resources.xml", + "configs/resources.xml.default", + "configs/workloads.xml", + "configs/workloads.xml.default", + ], with_minio=True, ) @@ -27,6 +33,41 @@ def start_cluster(): cluster.shutdown() +@pytest.fixture(scope="function", autouse=True) +def set_default_configs(): + node.exec_in_container( + [ + "bash", + "-c", + "cp /etc/clickhouse-server/config.d/resources.xml.default /etc/clickhouse-server/config.d/resources.xml", + ] + ) + node.exec_in_container( + [ + "bash", + "-c", + "cp /etc/clickhouse-server/config.d/workloads.xml.default /etc/clickhouse-server/config.d/workloads.xml", + ] + ) + node.query("system reload config") + yield + + +def update_workloads_config(**settings): + xml='' + for name in settings: + xml += f"<{name}>{settings[name]}" + print(xml) + node.exec_in_container( + [ + "bash", + "-c", + f"echo '{xml}' > /etc/clickhouse-server/config.d/workloads.xml", + ] + ) + node.query("system reload config") + + def test_s3_disk(): node.query( f""" @@ -110,3 +151,175 @@ def test_s3_disk(): ) == "1\n" ) + + +def test_merge_workload(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/merges'").strip()) + writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/merges'").strip()) + + node.query(f"insert into data select * from numbers(1e4)") + node.query(f"insert into data select * from numbers(2e4)") + node.query(f"insert into data select * from numbers(3e4)") + node.query(f"optimize table data final") + + reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/merges'").strip()) + writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/merges'").strip()) + + assert (reads_before < reads_after) + assert (writes_before < writes_after) + + +def test_merge_workload_override(): + node.query( + f""" + drop table if exists prod_data; + drop table if exists dev_data; + create table prod_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', merge_workload='prod_merges'; + create table dev_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', merge_workload='dev_merges'; + """ + ) + + prod_reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_merges'").strip()) + prod_writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_merges'").strip()) + dev_reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_merges'").strip()) + dev_writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_merges'").strip()) + + node.query(f"insert into prod_data select * from numbers(1e4)") + node.query(f"insert into prod_data select * from numbers(2e4)") + node.query(f"insert into prod_data select * from numbers(3e4)") + node.query(f"insert into dev_data select * from numbers(1e4)") + node.query(f"insert into dev_data select * from numbers(2e4)") + node.query(f"insert into dev_data select * from numbers(3e4)") + node.query(f"optimize table prod_data final") + node.query(f"optimize table dev_data final") + + prod_reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_merges'").strip()) + prod_writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_merges'").strip()) + dev_reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_merges'").strip()) + dev_writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_merges'").strip()) + + assert (prod_reads_before < prod_reads_after) + assert (prod_writes_before < prod_writes_after) + assert (dev_reads_before < dev_reads_after) + assert (dev_writes_before < dev_writes_after) + + +def test_mutate_workload(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + node.query(f"insert into data select * from numbers(1e4)") + node.query(f"optimize table data final") + + reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/mutations'").strip()) + writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/mutations'").strip()) + + node.query(f"alter table data update key = 1 where key = 42") + node.query(f"optimize table data final") + + reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/mutations'").strip()) + writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/mutations'").strip()) + + assert (reads_before < reads_after) + assert (writes_before < writes_after) + + +def test_mutation_workload_override(): + node.query( + f""" + drop table if exists prod_data; + drop table if exists dev_data; + create table prod_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', mutation_workload='prod_mutations'; + create table dev_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', mutation_workload='dev_mutations'; + """ + ) + + node.query(f"insert into prod_data select * from numbers(1e4)") + node.query(f"optimize table prod_data final") + node.query(f"insert into dev_data select * from numbers(1e4)") + node.query(f"optimize table dev_data final") + + prod_reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_mutations'").strip()) + prod_writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_mutations'").strip()) + dev_reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_mutations'").strip()) + dev_writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_mutations'").strip()) + + node.query(f"alter table prod_data update key = 1 where key = 42") + node.query(f"optimize table prod_data final") + node.query(f"alter table dev_data update key = 1 where key = 42") + node.query(f"optimize table dev_data final") + + prod_reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_mutations'").strip()) + prod_writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_mutations'").strip()) + dev_reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_mutations'").strip()) + dev_writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_mutations'").strip()) + + assert (prod_reads_before < prod_reads_after) + assert (prod_writes_before < prod_writes_after) + assert (dev_reads_before < dev_reads_after) + assert (dev_writes_before < dev_writes_after) + + +def test_merge_workload_change(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + for env in ['prod', 'dev']: + update_workloads_config(merge_workload=f"{env}_merges") + + reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_merges'").strip()) + writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_merges'").strip()) + + node.query(f"insert into data select * from numbers(1e4)") + node.query(f"insert into data select * from numbers(2e4)") + node.query(f"insert into data select * from numbers(3e4)") + node.query(f"optimize table data final") + + reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_merges'").strip()) + writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_merges'").strip()) + + assert (reads_before < reads_after) + assert (writes_before < writes_after) + +def test_mutation_workload_change(): + node.query( + f""" + drop table if exists data; + create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3'; + """ + ) + + for env in ['prod', 'dev']: + update_workloads_config(mutation_workload=f"{env}_mutations") + + node.query(f"insert into data select * from numbers(1e4)") + node.query(f"optimize table data final") + + reads_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_mutations'").strip()) + writes_before = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_mutations'").strip()) + + node.query(f"alter table data update key = 1 where key = 42") + node.query(f"optimize table data final") + + reads_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_mutations'").strip()) + writes_after = int(node.query(f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_mutations'").strip()) + + breakpoint() + + assert (reads_before < reads_after) + assert (writes_before < writes_after)