From 50db231e22938d77f83c0734bac565213395572d Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Tue, 26 Nov 2024 14:10:12 +0000 Subject: [PATCH] Backport #72357 to 24.9: Remove flaky test test_move_shared_lock_fail_keeper_unavailable and extend the stable one --- src/Common/FailPoint.cpp | 4 ++ src/Storages/StorageReplicatedMergeTree.cpp | 5 ++ .../test_s3_zero_copy_replication/test.py | 69 ++++--------------- 3 files changed, 24 insertions(+), 54 deletions(-) diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 24ac4d8378d..fd33fadd043 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -68,6 +68,10 @@ static struct InitFiu REGULAR(replicated_merge_tree_all_replicas_stale) \ REGULAR(zero_copy_lock_zk_fail_before_op) \ REGULAR(zero_copy_lock_zk_fail_after_op) \ + REGULAR(plain_object_storage_write_fail_on_directory_create) \ + REGULAR(plain_object_storage_write_fail_on_directory_move) \ + REGULAR(zero_copy_unlock_zk_fail_before_op) \ + REGULAR(zero_copy_unlock_zk_fail_after_op) \ namespace FailPoints diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6312d2f8c97..12debfb334c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -177,6 +177,8 @@ namespace FailPoints extern const char finish_set_quorum_failed_parts[]; extern const char zero_copy_lock_zk_fail_before_op[]; extern const char zero_copy_lock_zk_fail_after_op[]; + extern const char zero_copy_unlock_zk_fail_before_op[]; + extern const char zero_copy_unlock_zk_fail_after_op[]; } namespace ErrorCodes @@ -9800,6 +9802,9 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( LOG_TRACE(logger, "Removing zookeeper lock {} for part {} (files to keep: [{}])", zookeeper_part_replica_node, part_name, fmt::join(files_not_to_remove, ", ")); + fiu_do_on(FailPoints::zero_copy_unlock_zk_fail_before_op, { zookeeper_ptr->forceFailureBeforeOperation(); }); + fiu_do_on(FailPoints::zero_copy_unlock_zk_fail_after_op, { zookeeper_ptr->forceFailureAfterOperation(); }); + if (auto ec = zookeeper_ptr->tryRemove(zookeeper_part_replica_node); ec != Coordination::Error::ZOK) { /// Very complex case. It means that lock already doesn't exist when we tried to remove it. diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 913da707c18..6abb5cc5321 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -692,9 +692,16 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): @pytest.mark.parametrize( - "failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"] + "failpoint_lock", + ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"], ) -def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): +@pytest.mark.parametrize( + "failpoint_unlock", + [None, "zero_copy_unlock_zk_fail_before_op", "zero_copy_unlock_zk_fail_after_op"], +) +def test_move_shared_zero_copy_lock_fail( + started_cluster, test_table, failpoint_lock, failpoint_unlock +): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -713,7 +720,9 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") # Try to move and get fail on acquring zero-copy shared lock - node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}") + node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_lock}") + if failpoint_unlock: + node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_unlock}") node1.query_and_get_error( f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" ) @@ -728,7 +737,9 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): # Try another attempt after zk connection is restored # It should not failed due to leftovers of previous attempt (temporary cloned files) - node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}") + node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_lock}") + if failpoint_unlock: + node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_unlock}") node1.query( f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" ) @@ -747,53 +758,3 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC") - - -def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): - node1 = cluster.instances["node1"] - node2 = cluster.instances["node2"] - - node1.query( - f""" - CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') - ORDER BY date PARTITION BY date - SETTINGS storage_policy='hybrid' - """ - ) - - date = "2024-10-23" - node2.query(f"SYSTEM STOP FETCHES {test_table}") - - node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") - # Pause moving after part cloning, but before swapping - node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") - - def move(node): - node.query_and_get_error( - f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" - ) - - # Start moving - t1 = threading.Thread(target=move, args=[node1]) - t1.start() - - with PartitionManager() as pm: - pm.drop_instance_zk_connections(node1) - # Continue moving and try to swap - node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active") - t1.join() - - # Previous MOVE was failed, try another one after zk connection is restored - # It should not failed due to leftovers of previous attempt (temporary cloned files) - node1.query_with_retry( - f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" - ) - - # Sanity check - node2.query(f"SYSTEM START FETCHES {test_table}") - wait_for_active_parts(node2, 1, test_table, disk_name="s31") - assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" - - node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") - node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")