import datetime import logging import threading import time import pytest from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) cluster = ClickHouseCluster(__file__) @pytest.fixture(scope="module") def started_cluster(): try: cluster.add_instance( "node1", main_configs=["configs/config.d/s3.xml"], macros={"replica": "1"}, with_minio=True, with_zookeeper=True, ) cluster.add_instance( "node2", main_configs=["configs/config.d/s3.xml"], macros={"replica": "2"}, with_minio=True, with_zookeeper=True, ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") yield cluster finally: cluster.shutdown() def get_large_objects_count(cluster, size=100, folder="data"): minio = cluster.minio_client counter = 0 for obj in minio.list_objects( cluster.minio_bucket, "{}/".format(folder), recursive=True ): if obj.size is not None and obj.size >= size: counter = counter + 1 return counter def check_objects_exist(cluster, object_list, folder="data"): minio = cluster.minio_client for obj in object_list: if obj: minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj)) def check_objects_not_exisis(cluster, object_list, folder="data"): minio = cluster.minio_client for obj in object_list: if obj: try: minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj)) except Exception as error: assert "NoSuchKey" in str(error) else: assert False, "Object {} should not be exists".format(obj) def wait_for_large_objects_count(cluster, expected, size=100, timeout=30): while timeout > 0: if get_large_objects_count(cluster, size=size) == expected: return timeout -= 1 time.sleep(1) assert get_large_objects_count(cluster, size=size) == expected def wait_for_active_parts( node, num_expected_parts, table_name, timeout=30, disk_name=None ): deadline = time.monotonic() + timeout num_parts = 0 while time.monotonic() < deadline: query = ( f"select count() from system.parts where table = '{table_name}' and active" ) if disk_name: query += f" and disk_name='{disk_name}'" num_parts_str = node.query(query) num_parts = int(num_parts_str.strip()) if num_parts == num_expected_parts: return time.sleep(0.2) assert num_parts == num_expected_parts @pytest.fixture(scope="function") def test_name(request): return request.node.name @pytest.fixture(scope="function") def test_table(test_name): normalized = ( test_name.replace("[", "_") .replace("]", "_") .replace(" ", "_") .replace("-", "_") ) return "table_" + normalized # Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning @pytest.mark.order(0) @pytest.mark.parametrize("policy", ["s3"]) def test_s3_zero_copy_replication(started_cluster, policy): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query( """ CREATE TABLE s3_test ON CLUSTER test_cluster (id UInt32, value String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}') ORDER BY id SETTINGS storage_policy='{}' """.format( "{replica}", policy ) ) node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')") node2.query("SYSTEM SYNC REPLICA s3_test", timeout=30) assert ( node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')" ) assert ( node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')" ) # Based on version 21.x - should be only 1 file with size 100+ (checksums.txt), used by both nodes assert get_large_objects_count(cluster) == 1 node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')") node1.query("SYSTEM SYNC REPLICA s3_test", timeout=30) assert ( node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')" ) assert ( node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')" ) # Based on version 21.x - two parts wait_for_large_objects_count(cluster, 2) node1.query("OPTIMIZE TABLE s3_test FINAL") # Based on version 21.x - after merge, two old parts and one merged wait_for_large_objects_count(cluster, 3) # Based on version 21.x - after cleanup - only one merged part wait_for_large_objects_count(cluster, 1, timeout=60) node1.query("DROP TABLE IF EXISTS s3_test SYNC") node2.query("DROP TABLE IF EXISTS s3_test SYNC") def insert_data_time(node, table, number_of_mb, time, start=0): values = ",".join( f"({x},{time})" for x in range(start, int((1024 * 1024 * number_of_mb) / 8) + start + 1) ) node.query(f"INSERT INTO {table} VALUES {values}") def insert_large_data(node, table): tm = time.mktime((datetime.date.today() - datetime.timedelta(days=7)).timetuple()) insert_data_time(node, table, 1, tm, 0) tm = time.mktime((datetime.date.today() - datetime.timedelta(days=3)).timetuple()) insert_data_time(node, table, 1, tm, 1024 * 1024) tm = time.mktime(datetime.date.today().timetuple()) insert_data_time(node, table, 10, tm, 1024 * 1024 * 2) @pytest.mark.parametrize( ("storage_policy", "large_data", "iterations"), [ ("tiered", False, 10), ("tiered_copy", False, 10), ("tiered", True, 3), ("tiered_copy", True, 3), ], ) def test_s3_zero_copy_with_ttl_move( started_cluster, storage_policy, large_data, iterations ): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS ttl_move_test SYNC") node2.query("DROP TABLE IF EXISTS ttl_move_test SYNC") for i in range(iterations): node1.query( """ CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime) ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}') ORDER BY d TTL d1 + INTERVAL 2 DAY TO VOLUME 'external' SETTINGS storage_policy='{}' """.format( "{replica}", storage_policy ) ) if large_data: insert_large_data(node1, "ttl_move_test") else: node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)") node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)") node1.query("OPTIMIZE TABLE ttl_move_test FINAL") node2.query("SYSTEM SYNC REPLICA ttl_move_test", timeout=30) if large_data: assert ( node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)" ) assert ( node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)" ) else: assert ( node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)" ) assert ( node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)" ) assert ( node1.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)" ) assert ( node2.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)" ) node1.query("DROP TABLE IF EXISTS ttl_move_test SYNC") node2.query("DROP TABLE IF EXISTS ttl_move_test SYNC") @pytest.mark.parametrize( ("large_data", "iterations"), [ (False, 10), (True, 3), ], ) def test_s3_zero_copy_with_ttl_delete(started_cluster, large_data, iterations): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS ttl_delete_test SYNC") node2.query("DROP TABLE IF EXISTS ttl_delete_test SYNC") for i in range(iterations): node1.query( """ CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime) ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}') ORDER BY d TTL d1 + INTERVAL 2 DAY SETTINGS storage_policy='tiered' """.format( "{replica}" ) ) if large_data: insert_large_data(node1, "ttl_delete_test") else: node1.query( "INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)" ) node1.query( "INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)" ) node1.query("OPTIMIZE TABLE ttl_delete_test FINAL") node1.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30) node2.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30) if large_data: assert ( node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)" ) assert ( node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)" ) else: assert ( node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)" ) assert ( node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)" ) assert ( node1.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)" ) assert ( node2.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)" ) node1.query("DROP TABLE IF EXISTS ttl_delete_test SYNC") node2.query("DROP TABLE IF EXISTS ttl_delete_test SYNC") def wait_mutations(node, table, seconds): time.sleep(1) while seconds > 0: seconds -= 1 mutations = node.query( f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0" ) if mutations == "0\n": return time.sleep(1) mutations = node.query( f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0" ) assert mutations == "0\n" def wait_for_clean_old_parts(node, table, seconds): time.sleep(1) while seconds > 0: seconds -= 1 parts = node.query( f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0" ) if parts == "0\n": return time.sleep(1) parts = node.query( f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0" ) assert parts == "0\n" def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS unfreeze_test SYNC") node2.query("DROP TABLE IF EXISTS unfreeze_test SYNC") node1.query( """ CREATE TABLE unfreeze_test ON CLUSTER test_cluster (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/unfreeze_test', '{}') ORDER BY d SETTINGS storage_policy='s3' """.format( "{replica}" ) ) node1.query("INSERT INTO unfreeze_test VALUES (0)") wait_for_active_parts(node2, 1, "unfreeze_test") node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'") node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'") wait_mutations(node1, "unfreeze_test", 10) wait_mutations(node2, "unfreeze_test", 10) objects01 = node1.get_backuped_s3_objects("s31", "freeze_backup1") objects02 = node2.get_backuped_s3_objects("s31", "freeze_backup2") assert objects01 == objects02 check_objects_exist(cluster, objects01) node1.query("TRUNCATE TABLE unfreeze_test") node2.query("SYSTEM SYNC REPLICA unfreeze_test", timeout=30) objects11 = node1.get_backuped_s3_objects("s31", "freeze_backup1") objects12 = node2.get_backuped_s3_objects("s31", "freeze_backup2") assert objects01 == objects11 assert objects01 == objects12 check_objects_exist(cluster, objects11) node1.query(f"{unfreeze_query_template} 'freeze_backup1'") wait_mutations(node1, "unfreeze_test", 10) check_objects_exist(cluster, objects12) node2.query(f"{unfreeze_query_template} 'freeze_backup2'") wait_mutations(node2, "unfreeze_test", 10) check_objects_not_exisis(cluster, objects12) node1.query("DROP TABLE IF EXISTS unfreeze_test SYNC") node2.query("DROP TABLE IF EXISTS unfreeze_test SYNC") def test_s3_zero_copy_unfreeze_alter(started_cluster): s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME") def test_s3_zero_copy_unfreeze_system(started_cluster): s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME") def s3_zero_copy_drop_detached(cluster, unfreeze_query_template): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS drop_detached_test SYNC") node2.query("DROP TABLE IF EXISTS drop_detached_test SYNC") node1.query( """ CREATE TABLE drop_detached_test ON CLUSTER test_cluster (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/drop_detached_test', '{}') ORDER BY d PARTITION BY d SETTINGS storage_policy='s3' """.format( "{replica}" ) ) node1.query("INSERT INTO drop_detached_test VALUES (0)") node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup1'") node1.query("INSERT INTO drop_detached_test VALUES (1)") node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup2'") node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) objects1 = node1.get_backuped_s3_objects("s31", "detach_backup1") objects2 = node1.get_backuped_s3_objects("s31", "detach_backup2") objects_diff = list(set(objects2) - set(objects1)) node1.query(f"{unfreeze_query_template} 'detach_backup2'") node1.query(f"{unfreeze_query_template} 'detach_backup1'") node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '0'") node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '1'") node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) wait_mutations(node1, "drop_detached_test", 10) wait_mutations(node2, "drop_detached_test", 10) check_objects_exist(cluster, objects1) check_objects_exist(cluster, objects2) node2.query( "ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'", settings={"allow_drop_detached": 1}, ) node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) wait_mutations(node1, "drop_detached_test", 10) wait_mutations(node2, "drop_detached_test", 10) check_objects_exist(cluster, objects1) check_objects_exist(cluster, objects2) node1.query( "ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'", settings={"allow_drop_detached": 1}, ) node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) wait_mutations(node1, "drop_detached_test", 10) wait_mutations(node2, "drop_detached_test", 10) check_objects_exist(cluster, objects1) check_objects_not_exisis(cluster, objects_diff) node1.query( "ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'", settings={"allow_drop_detached": 1}, ) node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) wait_mutations(node1, "drop_detached_test", 10) wait_mutations(node2, "drop_detached_test", 10) check_objects_exist(cluster, objects1) node2.query( "ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'", settings={"allow_drop_detached": 1}, ) node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30) wait_mutations(node1, "drop_detached_test", 10) wait_mutations(node2, "drop_detached_test", 10) check_objects_not_exisis(cluster, objects1) def test_s3_zero_copy_drop_detached_alter(started_cluster): s3_zero_copy_drop_detached( cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME" ) def test_s3_zero_copy_drop_detached_system(started_cluster): s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME") def test_s3_zero_copy_concurrent_merge(started_cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS concurrent_merge SYNC") node2.query("DROP TABLE IF EXISTS concurrent_merge SYNC") for node in (node1, node2): node.query( """ CREATE TABLE concurrent_merge (id UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}') ORDER BY id SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1 """ ) node1.query("system stop merges") node2.query("system stop merges") # This will generate two parts with 20 granules each node1.query("insert into concurrent_merge select number from numbers(40)") node1.query("insert into concurrent_merge select number + 1 from numbers(40)") wait_for_active_parts(node2, 2, "concurrent_merge") # Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec. node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)") node1.query("system start merges") node2.query("system start merges") # Now, the merge should start. # Because of remote_fs_execute_merges_on_single_replica_time_threshold=1, # only one replica will start merge instantly. # The other replica should wait for 1 sec and also start it. # That should probably cause a data race at s3 storage. # For now, it does not happen (every blob has a random name, and we just have a duplicating data) node1.query("optimize table concurrent_merge final") wait_for_active_parts(node1, 1, "concurrent_merge") wait_for_active_parts(node2, 1, "concurrent_merge") for node in (node1, node2): assert node.query("select sum(id) from concurrent_merge").strip() == "1600" def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query("DROP TABLE IF EXISTS zero_copy_mutation SYNC") node2.query("DROP TABLE IF EXISTS zero_copy_mutation SYNC") node1.query( """ CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}') ORDER BY id PARTITION BY (id % 4) SETTINGS storage_policy='s3', old_parts_lifetime=1000 """ ) node2.query( """ CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}') ORDER BY id PARTITION BY (id % 4) SETTINGS storage_policy='s3', old_parts_lifetime=1000 """ ) node1.query( """ INSERT INTO zero_copy_mutation SELECT * FROM generateRandom('id UInt64, value1 String, value2 String, value3 String') limit 1000000 """ ) wait_for_active_parts(node2, 4, "zero_copy_mutation") objects1 = node1.get_table_objects("zero_copy_mutation") check_objects_exist(cluster, objects1) node1.query( """ ALTER TABLE zero_copy_mutation ADD COLUMN valueX String MATERIALIZED value1 """ ) node1.query( """ ALTER TABLE zero_copy_mutation MATERIALIZE COLUMN valueX """ ) wait_mutations(node1, "zero_copy_mutation", 10) wait_mutations(node2, "zero_copy_mutation", 10) # If bug present at least one node has metadata with incorrect ref_count values. # But it may be any node depends on mutation execution order. # We can try to find one, but this required knowledge about internal metadata structure. # It can be change in future, so we do not find this node here. # And with the bug test may be success sometimes. nodeX = node1 nodeY = node2 objectsY = nodeY.get_table_objects("zero_copy_mutation") check_objects_exist(cluster, objectsY) nodeX.query( """ ALTER TABLE zero_copy_mutation DETACH PARTITION '0' """ ) nodeX.query( """ ALTER TABLE zero_copy_mutation ATTACH PARTITION '0' """ ) wait_mutations(node1, "zero_copy_mutation", 10) wait_mutations(node2, "zero_copy_mutation", 10) nodeX.query( """ DROP TABLE zero_copy_mutation SYNC """ ) # time to remove objects time.sleep(10) nodeY.query( """ SELECT count() FROM zero_copy_mutation WHERE value3 LIKE '%ab%' """ ) check_objects_exist(cluster, objectsY) nodeY.query( """ DROP TABLE zero_copy_mutation SYNC """ ) # time to remove objects time.sleep(10) check_objects_not_exisis(cluster, objectsY) @pytest.mark.parametrize( "failpoint_lock", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"], ) @pytest.mark.parametrize( "failpoint_unlock", [None, "zero_copy_unlock_zk_fail_before_op", "zero_copy_unlock_zk_fail_after_op"], ) def test_move_shared_zero_copy_lock_fail( started_cluster, test_table, failpoint_lock, failpoint_unlock ): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] node1.query( f""" CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') ORDER BY date PARTITION BY date SETTINGS storage_policy='hybrid' """ ) date = "2024-10-23" node2.query(f"SYSTEM STOP FETCHES {test_table}") node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") # Try to move and get fail on acquring zero-copy shared lock node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_lock}") if failpoint_unlock: node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_unlock}") node1.query_and_get_error( f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" ) # After fail the part must remain on the source disk assert ( node1.query( f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" ) == "default\n" ) # Try another attempt after zk connection is restored # It should not failed due to leftovers of previous attempt (temporary cloned files) node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_lock}") if failpoint_unlock: node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_unlock}") node1.query( f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" ) assert ( node1.query( f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" ) == "s31\n" ) # Sanity check node2.query(f"SYSTEM START FETCHES {test_table}") wait_for_active_parts(node2, 1, test_table, disk_name="s31") assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")