ClickHouse/tests/integration/test_backup_restore_s3/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

304 lines
10 KiB
Python
Raw Normal View History

from typing import Dict, Iterable
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
2022-10-31 23:01:27 +00:00
main_configs=[
"configs/disk_s3.xml",
"configs/named_collection_s3_backups.xml",
"configs/s3_settings.xml",
],
user_configs=[
"configs/zookeeper_retries.xml",
],
with_minio=True,
)
2022-10-12 14:58:13 +00:00
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
2022-10-12 14:58:13 +00:00
yield
finally:
cluster.shutdown()
2022-10-12 14:58:13 +00:00
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"backup{backup_id_counter}"
def get_events(events_names: Iterable[str]) -> Dict[str, int]:
_events = TSV(
node.query(
f"SELECT event, value FROM system.events WHERE event in {events_names} SETTINGS system_events_show_zero_values = 1;"
)
)
return {
event: int(value)
for event, value in [line.split("\t") for line in _events.lines]
}
def check_backup_and_restore(
storage_policy, backup_destination, size=1000, backup_name=None, check_events=False
):
s3_backup_events = (
"WriteBufferFromS3Microseconds",
"WriteBufferFromS3Bytes",
"WriteBufferFromS3RequestsErrors",
)
s3_restore_events = (
"ReadBufferFromS3Microseconds",
"ReadBufferFromS3Bytes",
"ReadBufferFromS3RequestsErrors",
)
2022-10-12 14:58:13 +00:00
node.query(
f"""
2023-05-03 18:06:46 +00:00
DROP TABLE IF EXISTS data SYNC;
2022-10-12 14:58:13 +00:00
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
2022-10-31 23:01:27 +00:00
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size};
OPTIMIZE TABLE data FINAL;
2022-10-12 14:58:13 +00:00
"""
)
try:
events_before_backups = get_events(s3_backup_events)
node.query(f"BACKUP TABLE data TO {backup_destination}")
events_after_backups = get_events(s3_backup_events)
events_before_restore = get_events(s3_restore_events)
node.query(
f"""
RESTORE TABLE data AS data_restored FROM {backup_destination};
"""
)
events_after_restore = get_events(s3_restore_events)
node.query(
"""
SELECT throwIf(
(SELECT count(), sum(sipHash64(*)) FROM data) !=
(SELECT count(), sum(sipHash64(*)) FROM data_restored),
'Data does not matched after BACKUP/RESTORE'
);
"""
)
if check_events and backup_name:
objects = node.cluster.minio_client.list_objects(
"root", f"data/backups/multipart/{backup_name}/"
)
backup_meta_size = 0
for obj in objects:
if ".backup" in obj.object_name:
backup_meta_size = obj.size
break
backup_total_size = int(
node.query(
f"SELECT sum(total_size) FROM system.backups WHERE status = 'BACKUP_CREATED' AND name like '%{backup_name}%'"
).strip()
)
restore_total_size = int(
node.query(
f"SELECT sum(total_size) FROM system.backups WHERE status = 'RESTORED' AND name like '%{backup_name}%'"
).strip()
)
# backup
# NOTE: ~35 bytes is used by .lock file, so set up 100 bytes to avoid flaky test
assert (
abs(
backup_total_size
- (
events_after_backups["WriteBufferFromS3Bytes"]
- events_before_backups["WriteBufferFromS3Bytes"]
- backup_meta_size
)
)
< 100
)
assert (
events_after_backups["WriteBufferFromS3Microseconds"]
> events_before_backups["WriteBufferFromS3Microseconds"]
)
assert events_after_backups["WriteBufferFromS3RequestsErrors"] == 0
# restore
assert (
events_after_restore["ReadBufferFromS3Bytes"]
- events_before_restore["ReadBufferFromS3Bytes"]
- backup_meta_size
== restore_total_size
)
assert (
events_after_restore["ReadBufferFromS3Microseconds"]
> events_before_restore["ReadBufferFromS3Microseconds"]
)
assert events_after_restore["ReadBufferFromS3RequestsErrors"] == 0
finally:
node.query(
"""
DROP TABLE data SYNC;
DROP TABLE IF EXISTS data_restored SYNC;
"""
)
2022-10-12 14:58:13 +00:00
def check_system_tables():
disks = [
tuple(disk.split("\t"))
for disk in node.query("SELECT name, type FROM system.disks").split("\n")
if disk
]
expected_disks = (
("default", "local"),
("disk_s3", "s3"),
("disk_s3_other_bucket", "s3"),
("disk_s3_plain", "s3_plain"),
)
assert len(expected_disks) == len(disks)
for expected_disk in expected_disks:
if expected_disk not in disks:
raise AssertionError(f"Missed {expected_disk} in {disks}")
@pytest.mark.parametrize(
2022-10-12 14:58:13 +00:00
"storage_policy, to_disk",
[
pytest.param(
"default",
"default",
id="from_local_to_local",
),
pytest.param(
2022-10-12 14:58:13 +00:00
"policy_s3",
"default",
id="from_s3_to_local",
),
pytest.param(
"default",
2022-10-12 14:58:13 +00:00
"disk_s3",
id="from_local_to_s3",
),
pytest.param(
2022-10-12 14:58:13 +00:00
"policy_s3",
"disk_s3_plain",
id="from_s3_to_s3_plain",
),
pytest.param(
"default",
2022-10-12 14:58:13 +00:00
"disk_s3_plain",
id="from_local_to_s3_plain",
),
],
)
2022-10-12 14:58:13 +00:00
def test_backup_to_disk(storage_policy, to_disk):
backup_name = new_backup_name()
backup_destination = f"Disk('{to_disk}', '{backup_name}')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_s3():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = (
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
check_system_tables()
2022-10-12 14:58:13 +00:00
def test_backup_to_s3_named_collection():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3(named_collection_s3_backups, '{backup_name}')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_s3_multipart():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(
storage_policy,
backup_destination,
size=1000000,
backup_name=backup_name,
check_events=True,
)
assert node.contains_in_log(
f"copyDataToS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
)
2022-10-12 14:58:13 +00:00
def test_backup_to_s3_native_copy():
storage_policy = "policy_s3"
backup_name = new_backup_name()
backup_destination = (
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
2022-10-12 14:58:13 +00:00
2022-10-31 23:01:27 +00:00
def test_backup_to_s3_native_copy_other_bucket():
2022-10-12 14:58:13 +00:00
storage_policy = "policy_s3_other_bucket"
backup_name = new_backup_name()
backup_destination = (
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
2022-10-12 14:58:13 +00:00
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log(
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
2022-10-31 23:01:27 +00:00
def test_backup_to_s3_native_copy_multipart():
2022-10-31 23:01:27 +00:00
storage_policy = "policy_s3"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
2022-10-31 23:01:27 +00:00
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("BackupWriterS3.*using native copy")
assert node.contains_in_log("BackupReaderS3.*using native copy")
assert node.contains_in_log(
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)
2023-05-10 09:52:09 +00:00
def test_incremental_backup_append_table_def():
backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"
node.query(
"CREATE TABLE data (x UInt32, y String) Engine=MergeTree() ORDER BY y PARTITION BY x%10 SETTINGS storage_policy='policy_s3'"
)
node.query("INSERT INTO data SELECT number, toString(number) FROM numbers(100)")
assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"
node.query(f"BACKUP TABLE data TO {backup_name}")
node.query("ALTER TABLE data MODIFY SETTING parts_to_throw_insert=100")
incremental_backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', 'minio123')"
node.query(
f"BACKUP TABLE data TO {incremental_backup_name} SETTINGS base_backup = {backup_name}"
)
node.query("DROP TABLE data")
node.query(f"RESTORE TABLE data FROM {incremental_backup_name}")
assert node.query("SELECT count(), sum(x) FROM data") == "100\t4950\n"
assert "parts_to_throw_insert = 100" in node.query("SHOW CREATE TABLE data")