from time import sleep import pytest import os.path from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV, assert_eq_with_retry cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"], user_configs=["configs/allow_experimental_database_replicated.xml"], external_dirs=["/backups/"], macros={"replica": "node1", "shard": "shard1"}, with_zookeeper=True, ) node2 = cluster.add_instance( "node2", main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"], user_configs=["configs/allow_experimental_database_replicated.xml"], external_dirs=["/backups/"], macros={"replica": "node2", "shard": "shard1"}, with_zookeeper=True, ) @pytest.fixture(scope="module", autouse=True) def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() @pytest.fixture(autouse=True) def drop_after_test(): try: yield finally: node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY") node1.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' NO DELAY") backup_id_counter = 0 def new_backup_name(): global backup_id_counter backup_id_counter += 1 return f"Disk('backups', '{backup_id_counter}')" def get_path_to_backup(backup_name): name = backup_name.split(",")[1].strip("')/ ") return os.path.join(instance.cluster.instances_dir, "backups", name) def test_replicated_table(): node1.query( "CREATE TABLE tbl ON CLUSTER 'cluster' (" "x UInt8, y String" ") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')" "ORDER BY x" ) node1.query("INSERT INTO tbl VALUES (1, 'Don''t')") node2.query("INSERT INTO tbl VALUES (2, 'count')") node1.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (3, 'your')") node2.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (4, 'chickens')") node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl") backup_name = new_backup_name() # Make backup on node 1. node1.query( f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} SETTINGS replica_num=1" ) # Drop table on both nodes. node1.query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY") # Restore from backup on node2. node2.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl") assert node2.query("SELECT * FROM tbl ORDER BY x") == TSV( [[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]] ) assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV( [[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]] ) def test_replicated_database(): node1.query( "CREATE DATABASE mydb ON CLUSTER 'cluster' ENGINE=Replicated('/clickhouse/path/','{shard}','{replica}')" ) node1.query( "CREATE TABLE mydb.tbl(x UInt8, y String) ENGINE=ReplicatedMergeTree ORDER BY x" ) assert_eq_with_retry(node2, "EXISTS mydb.tbl", "1\n") node1.query("INSERT INTO mydb.tbl VALUES (1, 'Don''t')") node2.query("INSERT INTO mydb.tbl VALUES (2, 'count')") node1.query("INSERT INTO mydb.tbl VALUES (3, 'your')") node2.query("INSERT INTO mydb.tbl VALUES (4, 'chickens')") node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl") # Make backup. backup_name = new_backup_name() node1.query( f"BACKUP DATABASE mydb ON CLUSTER 'cluster' TO {backup_name} SETTINGS replica_num=2" ) # Drop table on both nodes. node1.query("DROP DATABASE mydb ON CLUSTER 'cluster' NO DELAY") # Restore from backup on node2. node1.query(f"RESTORE DATABASE mydb ON CLUSTER 'cluster' FROM {backup_name}") node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl") assert node1.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV( [[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]] ) assert node2.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV( [[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]] ) def test_different_tables_on_nodes(): node1.query( "CREATE TABLE tbl (`x` UInt8, `y` String) ENGINE = MergeTree ORDER BY x" ) node2.query("CREATE TABLE tbl (`w` Int64) ENGINE = MergeTree ORDER BY w") node1.query( "INSERT INTO tbl VALUES (1, 'Don''t'), (2, 'count'), (3, 'your'), (4, 'chickens')" ) node2.query("INSERT INTO tbl VALUES (-333), (-222), (-111), (0), (111)") backup_name = new_backup_name() node1.query( f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} SETTINGS allow_storing_multiple_replicas = true" ) node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY") node2.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") assert node1.query("SELECT * FROM tbl") == TSV( [[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]] ) assert node2.query("SELECT * FROM tbl") == TSV([-333, -222, -111, 0, 111])