ClickHouse/tests/integration/test_throttling/test.py
Azat Khuzhin 837f4ea676 Add ability to throttle merges/mutations
Main motivation was to has an ability to throttle background tasks, to
avoid affecting queries.

To new server settings had been added for this:
- max_mutations_bandwidth_for_server
- max_merges_bandwidth_for_server

Note, that they limit only reading, since usually you will not write
more data then you read, but sometimes it is possible in case of ALTER
UPDATE.

But for now, to keep things simple, I decided to limit this with only
2 settings instead of 4.

Note, that if the write throttling will be needed, then they can use the
same settings, and just create new throttler for write.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-12-24 22:31:49 +01:00

462 lines
13 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
# This test covers the following options:
# - max_backup_bandwidth
# - max_backup_bandwidth_for_server
# - max_local_read_bandwidth
# - max_local_read_bandwidth_for_server
# - max_local_write_bandwidth
# - max_local_write_bandwidth_for_server
# - max_remote_read_network_bandwidth
# - max_remote_read_network_bandwidth_for_server
# - max_remote_write_network_bandwidth
# - max_remote_write_network_bandwidth_for_server
# - and that max_backup_bandwidth from the query will override setting from the user profile
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def elapsed(func, *args, **kwargs):
start = time.time()
ret = func(*args, **kwargs)
end = time.time()
return ret, end - start
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=[
"configs/static_overrides.xml",
"configs/dynamic_overrides.xml",
"configs/ssl.xml",
],
user_configs=[
"configs/users_overrides.xml",
"configs/users_overrides_persistent.xml",
],
with_minio=True,
minio_certs_dir="minio_certs",
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield
finally:
cluster.shutdown()
@pytest.fixture(scope="function", autouse=True)
def revert_config():
# Revert configs after the test, not before
yield
node.exec_in_container(
[
"bash",
"-c",
f"echo '<clickhouse></clickhouse>' > /etc/clickhouse-server/config.d/dynamic_overrides.xml",
]
)
node.exec_in_container(
[
"bash",
"-c",
f"echo '<clickhouse></clickhouse>' > /etc/clickhouse-server/users.d/users_overrides.xml",
]
)
node.restart_clickhouse()
backup_id_counter = 0
def next_backup_name(storage):
global backup_id_counter
if storage == "local":
backup_id_counter += 1
return f"Disk('default', '{backup_id_counter}/')"
elif storage == "remote":
backup_id_counter += 1
return f"S3(s3, '{backup_id_counter}/')"
else:
raise Exception(storage)
def node_update_config(mode, setting, value=None):
if mode is None:
return
if mode == "server":
config_path = "/etc/clickhouse-server/config.d/dynamic_overrides.xml"
config_content = f"""
<clickhouse><{setting}>{value}</{setting}></clickhouse>
"""
else:
config_path = "/etc/clickhouse-server/users.d/users_overrides.xml"
config_content = f"""
<clickhouse>
<profiles>
<default>
<{setting}>{value}</{setting}>
</default>
</profiles>
</clickhouse>
"""
node.exec_in_container(
[
"bash",
"-c",
f"echo '{config_content}' > {config_path}",
]
)
node.restart_clickhouse()
def assert_took(took, should_took):
# we need to decrease the lower limit because the server limits could
# be enforced by throttling some server background IO instead of query IO
# and we have no control over it
#
# and the same for upper limit, it can be slightly larger, due to for
# instance network latencies or CPU starvation
if should_took > 0:
assert took >= should_took * 0.85 and took <= should_took * 1.8
else:
assert took >= should_took * 0.85
@pytest.mark.parametrize(
"policy,backup_name,mode,setting,value,should_took",
[
#
# Local -> Local
#
pytest.param(
"default",
next_backup_name("local"),
None,
None,
None,
0,
id="no_local_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"default",
next_backup_name("local"),
"user",
"max_backup_bandwidth",
"1M",
7,
id="user_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"default",
next_backup_name("local"),
"server",
"max_backup_bandwidth_for_server",
"2M",
3,
id="server_local_throttling",
),
#
# Remote -> Local
#
pytest.param(
"s3",
next_backup_name("local"),
None,
None,
None,
0,
id="no_remote_to_local_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"s3",
next_backup_name("local"),
"user",
"max_backup_bandwidth",
"1M",
7,
id="user_remote_to_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"s3",
next_backup_name("local"),
"server",
"max_backup_bandwidth_for_server",
"2M",
3,
id="server_remote_to_local_throttling",
),
#
# Remote -> Remote
#
pytest.param(
"s3",
next_backup_name("remote"),
None,
None,
None,
0,
id="no_remote_to_remote_throttling",
),
# No throttling for S3-to-S3, uses native copy
pytest.param(
"s3",
next_backup_name("remote"),
"user",
"max_backup_bandwidth",
"1M",
0,
id="user_remote_to_remote_throttling",
),
# No throttling for S3-to-S3, uses native copy
pytest.param(
"s3",
next_backup_name("remote"),
"server",
"max_backup_bandwidth_for_server",
"2M",
0,
id="server_remote_to_remote_throttling",
),
#
# Local -> Remote
#
# NOTE: S3 is complex, it will read file 3 times:
# - first for calculating the checksum
# - second for calculating the signature
# - and finally to write the payload to S3
# Hence the value should be multipled by 3.
#
# BUT: only in case of HTTP, HTTPS will not require this.
pytest.param(
"default",
next_backup_name("remote"),
None,
None,
None,
0,
id="no_local_to_remote_throttling",
),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"default",
next_backup_name("remote"),
"user",
"max_backup_bandwidth",
"1M",
7,
id="user_local_to_remote_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"default",
next_backup_name("remote"),
"server",
"max_backup_bandwidth_for_server",
"2M",
3,
id="server_local_to_remote_throttling",
),
],
)
def test_backup_throttling(policy, backup_name, mode, setting, value, should_took):
node_update_config(mode, setting, value)
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='{policy}';
insert into data select * from numbers(1e6);
"""
)
_, took = elapsed(node.query, f"backup table data to {backup_name}")
assert_took(took, should_took)
def test_backup_throttling_override():
node_update_config("user", "max_backup_bandwidth", "1M")
node.query(
"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9;
insert into data select * from numbers(1e6);
"""
)
backup_name = next_backup_name("local")
_, took = elapsed(
node.query,
f"backup table data to {backup_name}",
settings={
"max_backup_bandwidth": "500K",
},
)
# reading 1e6*8 bytes with 500Ki default bandwith should take (8-0.5)/0.5=15 seconds
assert_took(took, 15)
@pytest.mark.parametrize(
"policy,mode,setting,value,should_took",
[
#
# Local
#
pytest.param("default", None, None, None, 0, id="no_local_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"default",
"user",
"max_local_read_bandwidth",
"1M",
7,
id="user_local_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"default",
"server",
"max_local_read_bandwidth_for_server",
"2M",
3,
id="server_local_throttling",
),
#
# Remote
#
pytest.param("s3", None, None, None, 0, id="no_remote_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"s3",
"user",
"max_remote_read_network_bandwidth",
"1M",
7,
id="user_remote_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"s3",
"server",
"max_remote_read_network_bandwidth_for_server",
"2M",
3,
id="server_remote_throttling",
),
],
)
def test_read_throttling(policy, mode, setting, value, should_took):
node_update_config(mode, setting, value)
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='{policy}';
insert into data select * from numbers(1e6);
"""
)
_, took = elapsed(node.query, f"select * from data")
assert_took(took, should_took)
@pytest.mark.parametrize(
"policy,mode,setting,value,should_took",
[
#
# Local
#
pytest.param("default", None, None, None, 0, id="no_local_throttling"),
# reading 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"default",
"user",
"max_local_write_bandwidth",
"1M",
7,
id="local_user_throttling",
),
# reading 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"default",
"server",
"max_local_write_bandwidth_for_server",
"2M",
3,
id="local_server_throttling",
),
#
# Remote
#
pytest.param("s3", None, None, None, 0, id="no_remote_throttling"),
# writing 1e6*8 bytes with 1M default bandwith should take (8-1)/1=7 seconds
pytest.param(
"s3",
"user",
"max_remote_write_network_bandwidth",
"1M",
7,
id="user_remote_throttling",
),
# writing 1e6*8 bytes with 2M default bandwith should take (8-2)/2=3 seconds
pytest.param(
"s3",
"server",
"max_remote_write_network_bandwidth_for_server",
"2M",
3,
id="server_remote_throttling",
),
],
)
def test_write_throttling(policy, mode, setting, value, should_took):
node_update_config(mode, setting, value)
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='{policy}';
"""
)
_, took = elapsed(node.query, f"insert into data select * from numbers(1e6)")
assert_took(took, should_took)
def test_max_mutations_bandwidth_for_server():
node.query(
"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9;
"""
)
node.query("insert into data select * from numbers(1e6)")
_, took = elapsed(
node.query,
"alter table data update key = -key where 1 settings mutations_sync = 1",
)
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
assert_took(took, 7)
def test_max_merges_bandwidth_for_server():
node.query(
"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9;
"""
)
node.query("insert into data select * from numbers(1e6)")
_, took = elapsed(node.query, "optimize table data final")
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
assert_took(took, 7)