Merge pull request #64061 from ClickHouse/workload-for-merges

Workload classification for merges and mutations
This commit is contained in:
alesapin 2024-06-18 10:17:27 +00:00 committed by GitHub
commit 85771099f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 570 additions and 73 deletions

View File

@ -3084,3 +3084,21 @@ This setting is only necessary for the migration period and will become obsolete
Type: Bool
Default: 1
## merge_workload {#merge_workload}
Used to regulate how resources are utilized and shared between merges and other workloads. Specified value is used as `workload` setting value for all background merges. Can be overridden by a merge tree setting.
Default value: "default"
**See Also**
- [Workload Scheduling](/docs/en/operations/workload-scheduling.md)
## mutation_workload {#mutation_workload}
Used to regulate how resources are utilized and shared between mutations and other workloads. Specified value is used as `workload` setting value for all background mutations. Can be overridden by a merge tree setting.
Default value: "default"
**See Also**
- [Workload Scheduling](/docs/en/operations/workload-scheduling.md)

View File

@ -974,6 +974,24 @@ Default value: false
- [exclude_deleted_rows_for_part_size_in_merge](#exclude_deleted_rows_for_part_size_in_merge) setting
## merge_workload
Used to regulate how resources are utilized and shared between merges and other workloads. Specified value is used as `workload` setting value for background merges of this table. If not specified (empty string), then server setting `merge_workload` is used instead.
Default value: an empty string
**See Also**
- [Workload Scheduling](/docs/en/operations/workload-scheduling.md)
## mutation_workload
Used to regulate how resources are utilized and shared between mutations and other workloads. Specified value is used as `workload` setting value for background mutations of this table. If not specified (empty string), then server setting `mutation_workload` is used instead.
Default value: an empty string
**See Also**
- [Workload Scheduling](/docs/en/operations/workload-scheduling.md)
### optimize_row_order
Controls if the row order should be optimized during inserts to improve the compressability of the newly inserted table part.

View File

@ -47,6 +47,8 @@ Example:
Queries can be marked with setting `workload` to distinguish different workloads. If `workload` is not set, than value "default" is used. Note that you are able to specify the other value using settings profiles. Setting constraints can be used to make `workload` constant if you want all queries from the user to be marked with fixed value of `workload` setting.
It is possible to assign a `workload` setting for background activities. Merges and mutations are using `merge_workload` and `mutation_workload` server settings correspondingly. These values can also be overridden for specific tables using `merge_workload` and `mutation_workload` merge tree settings
Let's consider an example of a system with two different workloads: "production" and "development".
```sql
@ -151,6 +153,9 @@ Example:
</clickhouse>
```
## See also
- [system.scheduler](/docs/en/operations/system-tables/scheduler.md)
- [merge_workload](/docs/en/operations/settings/merge-tree-settings.md#merge_workload) merge tree setting
- [merge_workload](/docs/en/operations/server-configuration-parameters/settings.md#merge_workload) global server setting
- [mutation_workload](/docs/en/operations/settings/merge-tree-settings.md#mutation_workload) merge tree setting
- [mutation_workload](/docs/en/operations/server-configuration-parameters/settings.md#mutation_workload) global server setting

View File

@ -1609,6 +1609,10 @@ try
0, // We don't need any threads one all the parts will be deleted
new_server_settings.max_parts_cleaning_thread_pool_size);
global_context->setMergeWorkload(new_server_settings.merge_workload);
global_context->setMutationWorkload(new_server_settings.mutation_workload);
if (config->has("resources"))
{
global_context->getResourceManager()->updateConfiguration(*config);

View File

@ -1396,6 +1396,14 @@
<!-- <host_name>replica</host_name> -->
</distributed_ddl>
<!-- Used to regulate how resources are utilized and shared between merges, mutations and other workloads.
Specified value is used as `workload` setting value for background merge or mutation.
-->
<!--
<merge_workload>merges_and_mutations</merge_workload>
<mutation_workload>merges_and_mutations</mutation_workload>
-->
<!-- Settings to fine-tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<merge_tree>

View File

@ -146,6 +146,8 @@ namespace DB
M(UInt64, global_profiler_real_time_period_ns, 0, "Period for real clock timer of global profiler (in nanoseconds). Set 0 value to turn off the real clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, global_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of global profiler (in nanoseconds). Set 0 value to turn off the CPU clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, enable_azure_sdk_logging, false, "Enables logging from Azure sdk", 0) \
M(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
M(Double, gwp_asan_force_sample_probability, 0, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -281,6 +281,8 @@ struct ContextSharedPart : boost::noncopyable
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
String merge_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all merges
String mutation_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all mutations
std::unique_ptr<AccessControl> access_control TSA_GUARDED_BY(mutex);
mutable OnceFlag resource_manager_initialized;
mutable ResourceManagerPtr resource_manager;
@ -1561,11 +1563,36 @@ ResourceManagerPtr Context::getResourceManager() const
ClassifierPtr Context::getWorkloadClassifier() const
{
std::lock_guard lock(mutex);
// NOTE: Workload cannot be changed after query start, and getWorkloadClassifier() should not be called before proper `workload` is set
if (!classifier)
classifier = getResourceManager()->acquire(getSettingsRef().workload);
return classifier;
}
String Context::getMergeWorkload() const
{
SharedLockGuard lock(shared->mutex);
return shared->merge_workload;
}
void Context::setMergeWorkload(const String & value)
{
std::lock_guard lock(shared->mutex);
shared->merge_workload = value;
}
String Context::getMutationWorkload() const
{
SharedLockGuard lock(shared->mutex);
return shared->mutation_workload;
}
void Context::setMutationWorkload(const String & value)
{
std::lock_guard lock(shared->mutex);
shared->mutation_workload = value;
}
Scalars Context::getScalars() const
{
@ -2513,6 +2540,20 @@ void Context::makeQueryContext()
backups_query_throttler.reset();
}
void Context::makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings)
{
makeQueryContext();
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
settings.workload = merge_tree_settings.merge_workload.value.empty() ? getMergeWorkload() : merge_tree_settings.merge_workload;
}
void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings)
{
makeQueryContext();
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
settings.workload = merge_tree_settings.mutation_workload.value.empty() ? getMutationWorkload() : merge_tree_settings.mutation_workload;
}
void Context::makeSessionContext()
{
session_context = shared_from_this();

View File

@ -622,6 +622,10 @@ public:
/// Resource management related
ResourceManagerPtr getResourceManager() const;
ClassifierPtr getWorkloadClassifier() const;
String getMergeWorkload() const;
void setMergeWorkload(const String & value);
String getMutationWorkload() const;
void setMutationWorkload(const String & value);
/// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.
void setExternalTablesInitializer(ExternalTablesInitializer && initializer);
@ -907,6 +911,8 @@ public:
void setSessionContext(ContextMutablePtr context_) { session_context = context_; }
void makeQueryContext();
void makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings);
void makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings);
void makeSessionContext();
void makeGlobalContext();

View File

@ -310,7 +310,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
auto table_id = storage.getStorageID();
task_context = Context::createCopy(storage.getContext());
task_context->makeQueryContext();
task_context->makeQueryContextForMerge(*storage.getSettings());
task_context->setCurrentQueryId(getQueryId());
task_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::MERGE);

View File

@ -165,7 +165,7 @@ void MergePlainMergeTreeTask::finish()
ContextMutablePtr MergePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
context->makeQueryContextForMerge(*storage.getSettings());
auto queryId = getQueryId();
context->setCurrentQueryId(queryId);
context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::MERGE);

View File

@ -138,7 +138,7 @@ private:
virtual ~IStage() = default;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct GlobalRuntimeContext : public IStageRuntimeContext
@ -199,7 +199,7 @@ private:
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
@ -273,7 +273,7 @@ private:
GlobalRuntimeContextPtr global_ctx;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
@ -348,7 +348,7 @@ private:
GlobalRuntimeContextPtr global_ctx;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct MergeProjectionsRuntimeContext : public IStageRuntimeContext

View File

@ -81,6 +81,8 @@ struct Settings;
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -204,7 +204,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}
task_context = Context::createCopy(storage.getContext());
task_context->makeQueryContext();
task_context->makeQueryContextForMutate(*storage.getSettings());
task_context->setCurrentQueryId(getQueryId());
task_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::MUTATION);

View File

@ -136,7 +136,7 @@ bool MutatePlainMergeTreeTask::executeStep()
ContextMutablePtr MutatePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
context->makeQueryContextForMutate(*storage.getSettings());
auto queryId = getQueryId();
context->setCurrentQueryId(queryId);
context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::MUTATION);

View File

@ -81,7 +81,10 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
{"uncompressed_cache_size", {std::to_string(context->getUncompressedCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
{"index_mark_cache_size", {std::to_string(context->getIndexMarkCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
{"index_uncompressed_cache_size", {std::to_string(context->getIndexUncompressedCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
{"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}}
{"mmap_cache_size", {std::to_string(context->getMMappedFileCache()->maxSizeInBytes()), ChangeableWithoutRestart::Yes}},
{"merge_workload", {context->getMergeWorkload(), ChangeableWithoutRestart::Yes}},
{"mutation_workload", {context->getMutationWorkload(), ChangeableWithoutRestart::Yes}}
};
if (context->areBackgroundExecutorsInitialized())

View File

@ -0,0 +1,3 @@
<clickhouse>
<!-- Will be overwritten by the test -->
</clickhouse>

View File

@ -0,0 +1,76 @@
<clickhouse>
<resources>
<network_read>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
<node path="/prio/fair/sys"> <type>fair</type><weight>90</weight></node>
<node path="/prio/fair/sys/merges"> <type>fifo</type></node>
<node path="/prio/fair/sys/mutations"> <type>fifo</type></node>
<node path="/prio/fair/prod_merges"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/prod_mutations"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev_merges"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev_mutations"> <type>fifo</type><weight>9</weight></node>
</network_read>
<network_write>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
<node path="/prio/fair/sys"> <type>fair</type><weight>90</weight></node>
<node path="/prio/fair/sys/merges"> <type>fifo</type></node>
<node path="/prio/fair/sys/mutations"> <type>fifo</type></node>
<node path="/prio/fair/prod_merges"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/prod_mutations"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev_merges"> <type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev_mutations"> <type>fifo</type><weight>9</weight></node>
</network_write>
</resources>
<workload_classifiers>
<admin>
<network_read>/prio/admin</network_read>
<network_write>/prio/admin</network_write>
</admin>
<production>
<network_read>/prio/fair/prod</network_read>
<network_write>/prio/fair/prod</network_write>
</production>
<development>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</development>
<default>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</default>
<sys_merges>
<network_read>/prio/fair/sys/merges</network_read>
<network_write>/prio/fair/sys/merges</network_write>
</sys_merges>
<sys_mutations>
<network_read>/prio/fair/sys/mutations</network_read>
<network_write>/prio/fair/sys/mutations</network_write>
</sys_mutations>
<prod_merges>
<network_read>/prio/fair/prod_merges</network_read>
<network_write>/prio/fair/prod_merges</network_write>
</prod_merges>
<prod_mutations>
<network_read>/prio/fair/prod_mutations</network_read>
<network_write>/prio/fair/prod_mutations</network_write>
</prod_mutations>
<dev_merges>
<network_read>/prio/fair/dev_merges</network_read>
<network_write>/prio/fair/dev_merges</network_write>
</dev_merges>
<dev_mutations>
<network_read>/prio/fair/dev_mutations</network_read>
<network_write>/prio/fair/dev_mutations</network_write>
</dev_mutations>
</workload_classifiers>
</clickhouse>

View File

@ -1,62 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<s3_max_put_rps>10</s3_max_put_rps>
<s3_max_get_rps>10</s3_max_get_rps>
<read_resource>network_read</read_resource>
<write_resource>network_write</write_resource>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<resources>
<network_read>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"><type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
</network_read>
<network_write>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"><type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
</network_write>
</resources>
<workload_classifiers>
<admin>
<network_read>/prio/admin</network_read>
<network_write>/prio/admin</network_write>
</admin>
<production>
<network_read>/prio/fair/prod</network_read>
<network_write>/prio/fair/prod</network_write>
</production>
<development>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</development>
<default>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</default>
</workload_classifiers>
</clickhouse>

View File

@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<s3_max_put_rps>10</s3_max_put_rps>
<s3_max_get_rps>10</s3_max_get_rps>
<read_resource>network_read</read_resource>
<write_resource>network_write</write_resource>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<!-- Will be overwritten by the test -->
</clickhouse>

View File

@ -0,0 +1,4 @@
<clickhouse>
<merge_workload>sys_merges</merge_workload>
<mutation_workload>sys_mutations</mutation_workload>
</clickhouse>

View File

@ -14,7 +14,13 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=["configs/scheduler.xml"],
main_configs=[
"configs/storage_configuration.xml",
"configs/resources.xml",
"configs/resources.xml.default",
"configs/workloads.xml",
"configs/workloads.xml.default",
],
with_minio=True,
)
@ -28,6 +34,41 @@ def start_cluster():
cluster.shutdown()
@pytest.fixture(scope="function", autouse=True)
def set_default_configs():
node.exec_in_container(
[
"bash",
"-c",
"cp /etc/clickhouse-server/config.d/resources.xml.default /etc/clickhouse-server/config.d/resources.xml",
]
)
node.exec_in_container(
[
"bash",
"-c",
"cp /etc/clickhouse-server/config.d/workloads.xml.default /etc/clickhouse-server/config.d/workloads.xml",
]
)
node.query("system reload config")
yield
def update_workloads_config(**settings):
xml = ""
for name in settings:
xml += f"<{name}>{settings[name]}</{name}>"
print(xml)
node.exec_in_container(
[
"bash",
"-c",
f"echo '<clickhouse>{xml}</clickhouse>' > /etc/clickhouse-server/config.d/workloads.xml",
]
)
node.query("system reload config")
def test_s3_disk():
node.query(
f"""
@ -111,3 +152,302 @@ def test_s3_disk():
)
== "1\n"
)
def test_merge_workload():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/merges'"
).strip()
)
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/merges'"
).strip()
)
node.query(f"insert into data select * from numbers(1e4)")
node.query(f"insert into data select * from numbers(2e4)")
node.query(f"insert into data select * from numbers(3e4)")
node.query(f"optimize table data final")
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/merges'"
).strip()
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/merges'"
).strip()
)
assert reads_before < reads_after
assert writes_before < writes_after
def test_merge_workload_override():
node.query(
f"""
drop table if exists prod_data;
drop table if exists dev_data;
create table prod_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', merge_workload='prod_merges';
create table dev_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', merge_workload='dev_merges';
"""
)
prod_reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_merges'"
).strip()
)
prod_writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_merges'"
).strip()
)
dev_reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_merges'"
).strip()
)
dev_writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_merges'"
).strip()
)
node.query(f"insert into prod_data select * from numbers(1e4)")
node.query(f"insert into prod_data select * from numbers(2e4)")
node.query(f"insert into prod_data select * from numbers(3e4)")
node.query(f"insert into dev_data select * from numbers(1e4)")
node.query(f"insert into dev_data select * from numbers(2e4)")
node.query(f"insert into dev_data select * from numbers(3e4)")
node.query(f"optimize table prod_data final")
node.query(f"optimize table dev_data final")
prod_reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_merges'"
).strip()
)
prod_writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_merges'"
).strip()
)
dev_reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_merges'"
).strip()
)
dev_writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_merges'"
).strip()
)
assert prod_reads_before < prod_reads_after
assert prod_writes_before < prod_writes_after
assert dev_reads_before < dev_reads_after
assert dev_writes_before < dev_writes_after
def test_mutate_workload():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
node.query(f"insert into data select * from numbers(1e4)")
node.query(f"optimize table data final")
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/mutations'"
).strip()
)
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/mutations'"
).strip()
)
node.query(f"alter table data update key = 1 where key = 42")
node.query(f"optimize table data final")
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/sys/mutations'"
).strip()
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/sys/mutations'"
).strip()
)
assert reads_before < reads_after
assert writes_before < writes_after
def test_mutation_workload_override():
node.query(
f"""
drop table if exists prod_data;
drop table if exists dev_data;
create table prod_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', mutation_workload='prod_mutations';
create table dev_data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3', mutation_workload='dev_mutations';
"""
)
node.query(f"insert into prod_data select * from numbers(1e4)")
node.query(f"optimize table prod_data final")
node.query(f"insert into dev_data select * from numbers(1e4)")
node.query(f"optimize table dev_data final")
prod_reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_mutations'"
).strip()
)
prod_writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_mutations'"
).strip()
)
dev_reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_mutations'"
).strip()
)
dev_writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_mutations'"
).strip()
)
node.query(f"alter table prod_data update key = 1 where key = 42")
node.query(f"optimize table prod_data final")
node.query(f"alter table dev_data update key = 1 where key = 42")
node.query(f"optimize table dev_data final")
prod_reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/prod_mutations'"
).strip()
)
prod_writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/prod_mutations'"
).strip()
)
dev_reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/dev_mutations'"
).strip()
)
dev_writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/dev_mutations'"
).strip()
)
assert prod_reads_before < prod_reads_after
assert prod_writes_before < prod_writes_after
assert dev_reads_before < dev_reads_after
assert dev_writes_before < dev_writes_after
def test_merge_workload_change():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
for env in ["prod", "dev"]:
update_workloads_config(merge_workload=f"{env}_merges")
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_merges'"
).strip()
)
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_merges'"
).strip()
)
node.query(f"insert into data select * from numbers(1e4)")
node.query(f"insert into data select * from numbers(2e4)")
node.query(f"insert into data select * from numbers(3e4)")
node.query(f"optimize table data final")
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_merges'"
).strip()
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_merges'"
).strip()
)
assert reads_before < reads_after
assert writes_before < writes_after
def test_mutation_workload_change():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
for env in ["prod", "dev"]:
update_workloads_config(mutation_workload=f"{env}_mutations")
node.query(f"insert into data select * from numbers(1e4)")
node.query(f"optimize table data final")
reads_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_mutations'"
).strip()
)
writes_before = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_mutations'"
).strip()
)
node.query(f"alter table data update key = 1 where key = 42")
node.query(f"optimize table data final")
reads_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_read' and path='/prio/fair/{env}_mutations'"
).strip()
)
writes_after = int(
node.query(
f"select dequeued_requests from system.scheduler where resource='network_write' and path='/prio/fair/{env}_mutations'"
).strip()
)
assert reads_before < reads_after
assert writes_before < writes_after