import os import logging import random import string import time import pytest from helpers.cluster import ClickHouseCluster logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml') COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/log_conf.xml", "configs/config.d/clusters.xml"] def replace_config(old, new): config = open(NOT_RESTORABLE_CONFIG_PATH, 'r') config_lines = config.readlines() config.close() config_lines = [line.replace(old, new) for line in config_lines] config = open(NOT_RESTORABLE_CONFIG_PATH, 'w') config.writelines(config_lines) config.close() @pytest.fixture(scope="module") def cluster(): try: cluster = ClickHouseCluster(__file__) cluster.add_instance("node", main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf.xml"], macros={"cluster": "node", "replica": "0"}, with_minio=True, with_zookeeper=True, stay_alive=True) cluster.add_instance("node_another_bucket", main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket.xml"], macros={"cluster": "node_another_bucket", "replica": "0"}, with_zookeeper=True, stay_alive=True) cluster.add_instance("node_another_bucket_path", main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_another_bucket_path.xml"], stay_alive=True) cluster.add_instance("node_not_restorable", main_configs=COMMON_CONFIGS + ["configs/config.d/storage_conf_not_restorable.xml"], stay_alive=True) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") yield cluster finally: cluster.shutdown() def random_string(length): letters = string.ascii_letters return ''.join(random.choice(letters) for i in range(length)) def generate_values(date_str, count, sign=1): data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)] data.sort(key=lambda tup: tup[1]) return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data]) def create_table(node, table_name, replicated=False): node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary") create_table_statement = """ CREATE TABLE s3.{table_name} {on_cluster} ( dt Date, id Int64, data String, counter Int64, INDEX min_max (id) TYPE minmax GRANULARITY 3 ) ENGINE={engine} PARTITION BY dt ORDER BY (dt, id) SETTINGS storage_policy='s3', old_parts_lifetime=600, index_granularity=512 """.format(table_name=table_name, on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "", engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()") node.query(create_table_statement) def purge_s3(cluster, bucket): minio = cluster.minio_client for obj in list(minio.list_objects(bucket, recursive=True)): if str(obj.object_name).find(".SCHEMA_VERSION") != -1: continue minio.remove_object(bucket, obj.object_name) def drop_s3_metadata(node): node.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/disks/s3/*'], user='root') def drop_shadow_information(node): node.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/shadow/*'], user='root') def create_restore_file(node, revision=None, bucket=None, path=None, detached=None): node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root') add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore' if revision: node.exec_in_container(['bash', '-c', add_restore_option.format('revision', revision)], user='root') if bucket: node.exec_in_container(['bash', '-c', add_restore_option.format('source_bucket', bucket)], user='root') if path: node.exec_in_container(['bash', '-c', add_restore_option.format('source_path', path)], user='root') if detached: node.exec_in_container(['bash', '-c', add_restore_option.format('detached', 'true')], user='root') def get_revision_counter(node, backup_number): return int(node.exec_in_container( ['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root')) @pytest.fixture(autouse=True) def drop_table(cluster): yield node_names = ["node", "node_another_bucket", "node_another_bucket_path", "node_not_restorable"] for node_name in node_names: node = cluster.instances[node_name] node.query("DROP TABLE IF EXISTS s3.test SYNC") node.query("DROP DATABASE IF EXISTS s3 SYNC") drop_s3_metadata(node) drop_shadow_information(node) buckets = [cluster.minio_bucket, cluster.minio_bucket_2] for bucket in buckets: purge_s3(cluster, bucket) @pytest.mark.parametrize( "replicated", [False, True] ) def test_full_restore(cluster, replicated): node = cluster.instances["node"] create_table(node, "test", replicated) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) node.stop_clickhouse() drop_s3_metadata(node) create_restore_file(node) node.start_clickhouse() assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) def test_restore_another_bucket_path(cluster): node = cluster.instances["node"] create_table(node, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) # To ensure parts have merged node.query("OPTIMIZE TABLE s3.test") assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) node_another_bucket = cluster.instances["node_another_bucket"] create_table(node_another_bucket, "test") node_another_bucket.stop_clickhouse() create_restore_file(node_another_bucket, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) node_another_bucket_path = cluster.instances["node_another_bucket_path"] create_table(node_another_bucket_path, "test") node_another_bucket_path.stop_clickhouse() create_restore_file(node_another_bucket_path, bucket="root2", path="data") node_another_bucket_path.start_clickhouse() assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) def test_restore_different_revisions(cluster): node = cluster.instances["node"] create_table(node, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("ALTER TABLE s3.test FREEZE") revision1 = get_revision_counter(node, 1) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) node.query("ALTER TABLE s3.test FREEZE") revision2 = get_revision_counter(node, 2) # To ensure parts have merged node.query("OPTIMIZE TABLE s3.test") node.query("ALTER TABLE s3.test FREEZE") revision3 = get_revision_counter(node, 3) assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node.query("SELECT count(*) from system.parts where table = 'test'") == '5\n' node_another_bucket = cluster.instances["node_another_bucket"] create_table(node_another_bucket, "test") # Restore to revision 1 (2 parts). node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision1, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '2\n' # Restore to revision 2 (4 parts). node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision2, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '4\n' # Restore to revision 3 (4 parts + 1 merged). node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision3, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '5\n' def test_restore_mutations(cluster): node = cluster.instances["node"] create_table(node, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) node.query("ALTER TABLE s3.test FREEZE") revision_before_mutation = get_revision_counter(node, 1) node.query("ALTER TABLE s3.test UPDATE counter = 1 WHERE 1", settings={"mutations_sync": 2}) node.query("ALTER TABLE s3.test FREEZE") revision_after_mutation = get_revision_counter(node, 2) node_another_bucket = cluster.instances["node_another_bucket"] create_table(node_another_bucket, "test") # Restore to revision before mutation. node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(0) # Restore to revision after mutation. node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision_after_mutation, bucket="root") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096) # Restore to revision in the middle of mutation. # Unfinished mutation should be completed after table startup. node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) revision = (revision_before_mutation + revision_after_mutation) // 2 create_restore_file(node_another_bucket, revision=revision, bucket="root") node_another_bucket.start_clickhouse() # Wait for unfinished mutation completion. time.sleep(3) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096) def test_migrate_to_restorable_schema(cluster): node = cluster.instances["node_not_restorable"] create_table(node, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) replace_config("false", "true") node.restart_clickhouse() node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1))) node.query("ALTER TABLE s3.test FREEZE") revision = get_revision_counter(node, 1) assert revision != 0 node_another_bucket = cluster.instances["node_another_bucket"] create_table(node_another_bucket, "test") # Restore to revision before mutation. node_another_bucket.stop_clickhouse() drop_s3_metadata(node_another_bucket) purge_s3(cluster, cluster.minio_bucket_2) create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data") node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @pytest.mark.parametrize( "replicated", [False, True] ) def test_restore_to_detached(cluster, replicated): node = cluster.instances["node"] create_table(node, "test", replicated) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-07', 4096, 0))) # Add some mutation. node.query("ALTER TABLE s3.test UPDATE counter = 1 WHERE 1", settings={"mutations_sync": 2}) # Detach some partition. node.query("ALTER TABLE s3.test DETACH PARTITION '2020-01-07'") node.query("ALTER TABLE s3.test FREEZE") revision = get_revision_counter(node, 1) node_another_bucket = cluster.instances["node_another_bucket"] create_table(node_another_bucket, "test", replicated) node_another_bucket.stop_clickhouse() create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True) node_another_bucket.start_clickhouse() assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0) node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-03'") node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-04'") node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-05'") node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-06'") assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) # Attach partition that was already detached before backup-restore. node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-07'") assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 5) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 5)