import pytest import re import os.path from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", main_configs=["configs/backups_disk.xml"], external_dirs=["/backups/"] ) def create_and_fill_table(engine="MergeTree"): if engine == "MergeTree": engine = "MergeTree ORDER BY y PARTITION BY x%10" instance.query("CREATE DATABASE test") instance.query(f"CREATE TABLE test.table(x UInt32, y String) ENGINE={engine}") instance.query( "INSERT INTO test.table SELECT number, toString(number) FROM numbers(100)" ) @pytest.fixture(scope="module", autouse=True) def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def cleanup_after_test(): try: yield finally: instance.query("DROP DATABASE IF EXISTS test") backup_id_counter = 0 def new_backup_name(): global backup_id_counter backup_id_counter += 1 return f"Disk('backups', '{backup_id_counter}/')" def get_backup_dir(backup_name): counter = int(backup_name.split(",")[1].strip("')/ ")) return os.path.join(instance.path, f"backups/{counter}") @pytest.mark.parametrize( "engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"] ) def test_restore_table(engine): backup_name = new_backup_name() create_and_fill_table(engine=engine) assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query("DROP TABLE test.table") assert instance.query("EXISTS test.table") == "0\n" instance.query(f"RESTORE TABLE test.table FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" @pytest.mark.parametrize( "engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"] ) def test_restore_table_into_existing_table(engine): backup_name = new_backup_name() create_and_fill_table(engine=engine) assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query( f"RESTORE TABLE test.table INTO test.table FROM {backup_name}" ) assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n" instance.query( f"RESTORE TABLE test.table INTO test.table FROM {backup_name}" ) assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n" def test_restore_table_under_another_name(): backup_name = new_backup_name() create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") assert instance.query("EXISTS test.table2") == "0\n" instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" def test_backup_table_under_another_name(): backup_name = new_backup_name() create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}") assert instance.query("EXISTS test.table2") == "0\n" instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" def test_materialized_view(): backup_name = new_backup_name() instance.query( "CREATE MATERIALIZED VIEW mv_1(x UInt8) ENGINE=MergeTree ORDER BY tuple() POPULATE AS SELECT 1 AS x" ) instance.query(f"BACKUP TABLE mv_1 TO {backup_name}") instance.query("DROP TABLE mv_1") instance.query(f"RESTORE TABLE mv_1 FROM {backup_name}") assert instance.query("SELECT * FROM mv_1") == "1\n" instance.query("DROP TABLE mv_1") def test_incremental_backup(): backup_name = new_backup_name() incremental_backup_name = new_backup_name() create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')") assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n" instance.query( f"BACKUP TABLE test.table TO {incremental_backup_name} SETTINGS base_backup = {backup_name}" ) instance.query( f"RESTORE TABLE test.table AS test.table2 FROM {incremental_backup_name}" ) assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n" def test_incremental_backup_after_renaming_table(): backup_name = new_backup_name() incremental_backup_name = new_backup_name() create_and_fill_table() instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query("RENAME TABLE test.table TO test.table2") instance.query( f"BACKUP TABLE test.table2 TO {incremental_backup_name} SETTINGS base_backup = {backup_name}" ) # Files in a base backup can be searched by checksum, so an incremental backup with a renamed table actually # contains only its changed metadata. contents = os.listdir(get_backup_dir(incremental_backup_name)) assert '.backup' in contents contents.remove('.backup') assert len(contents) == 1 with open(os.path.join(get_backup_dir(incremental_backup_name), contents[0])) as table_def_in_backup: assert table_def_in_backup.read().startswith('CREATE TABLE test.table2') instance.query("DROP TABLE test.table2") instance.query(f"RESTORE TABLE test.table2 FROM {incremental_backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" def test_backup_not_found_or_already_exists(): backup_name = new_backup_name() expected_error = "Backup .* not found" assert re.search( expected_error, instance.query_and_get_error( f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}" ), ) create_and_fill_table() instance.query(f"BACKUP TABLE test.table TO {backup_name}") expected_error = "Backup .* already exists" assert re.search( expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO {backup_name}"), ) def test_file_engine(): backup_name = f"File('/backups/file/')" create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") instance.query("DROP TABLE test.table") assert instance.query("EXISTS test.table") == "0\n" instance.query(f"RESTORE TABLE test.table FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" def test_database(): backup_name = new_backup_name() create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP DATABASE test TO {backup_name}") instance.query("DROP DATABASE test") instance.query(f"RESTORE DATABASE test FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" def test_zip_archive(): backup_name = f"File('/backups/archive.zip')" create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query(f"BACKUP TABLE test.table TO {backup_name}") assert os.path.isfile( os.path.join(os.path.join(instance.path, "backups/archive.zip")) ) instance.query("DROP TABLE test.table") assert instance.query("EXISTS test.table") == "0\n" instance.query(f"RESTORE TABLE test.table FROM {backup_name}") assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" def test_zip_archive_with_settings(): backup_name = f"File('/backups/archive_with_settings.zip')" create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" instance.query( f"BACKUP TABLE test.table TO {backup_name} SETTINGS compression_method='lzma', compression_level=3, password='qwerty'" ) instance.query("DROP TABLE test.table") assert instance.query("EXISTS test.table") == "0\n" instance.query( f"RESTORE TABLE test.table FROM {backup_name} SETTINGS password='qwerty'" ) assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"