mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
Merge pull request #72477 from ClickHouse/backport/24.9/72357
Backport #72357 to 24.9: Remove flaky test test_move_shared_lock_fail_keeper_unavailable and extend the stable one
This commit is contained in:
commit
8bbe3ea178
@ -68,6 +68,10 @@ static struct InitFiu
|
|||||||
REGULAR(replicated_merge_tree_all_replicas_stale) \
|
REGULAR(replicated_merge_tree_all_replicas_stale) \
|
||||||
REGULAR(zero_copy_lock_zk_fail_before_op) \
|
REGULAR(zero_copy_lock_zk_fail_before_op) \
|
||||||
REGULAR(zero_copy_lock_zk_fail_after_op) \
|
REGULAR(zero_copy_lock_zk_fail_after_op) \
|
||||||
|
REGULAR(plain_object_storage_write_fail_on_directory_create) \
|
||||||
|
REGULAR(plain_object_storage_write_fail_on_directory_move) \
|
||||||
|
REGULAR(zero_copy_unlock_zk_fail_before_op) \
|
||||||
|
REGULAR(zero_copy_unlock_zk_fail_after_op) \
|
||||||
|
|
||||||
|
|
||||||
namespace FailPoints
|
namespace FailPoints
|
||||||
|
@ -177,6 +177,8 @@ namespace FailPoints
|
|||||||
extern const char finish_set_quorum_failed_parts[];
|
extern const char finish_set_quorum_failed_parts[];
|
||||||
extern const char zero_copy_lock_zk_fail_before_op[];
|
extern const char zero_copy_lock_zk_fail_before_op[];
|
||||||
extern const char zero_copy_lock_zk_fail_after_op[];
|
extern const char zero_copy_lock_zk_fail_after_op[];
|
||||||
|
extern const char zero_copy_unlock_zk_fail_before_op[];
|
||||||
|
extern const char zero_copy_unlock_zk_fail_after_op[];
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
@ -9800,6 +9802,9 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
|
|||||||
|
|
||||||
LOG_TRACE(logger, "Removing zookeeper lock {} for part {} (files to keep: [{}])", zookeeper_part_replica_node, part_name, fmt::join(files_not_to_remove, ", "));
|
LOG_TRACE(logger, "Removing zookeeper lock {} for part {} (files to keep: [{}])", zookeeper_part_replica_node, part_name, fmt::join(files_not_to_remove, ", "));
|
||||||
|
|
||||||
|
fiu_do_on(FailPoints::zero_copy_unlock_zk_fail_before_op, { zookeeper_ptr->forceFailureBeforeOperation(); });
|
||||||
|
fiu_do_on(FailPoints::zero_copy_unlock_zk_fail_after_op, { zookeeper_ptr->forceFailureAfterOperation(); });
|
||||||
|
|
||||||
if (auto ec = zookeeper_ptr->tryRemove(zookeeper_part_replica_node); ec != Coordination::Error::ZOK)
|
if (auto ec = zookeeper_ptr->tryRemove(zookeeper_part_replica_node); ec != Coordination::Error::ZOK)
|
||||||
{
|
{
|
||||||
/// Very complex case. It means that lock already doesn't exist when we tried to remove it.
|
/// Very complex case. It means that lock already doesn't exist when we tried to remove it.
|
||||||
|
@ -692,9 +692,16 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"]
|
"failpoint_lock",
|
||||||
|
["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"],
|
||||||
)
|
)
|
||||||
def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
|
@pytest.mark.parametrize(
|
||||||
|
"failpoint_unlock",
|
||||||
|
[None, "zero_copy_unlock_zk_fail_before_op", "zero_copy_unlock_zk_fail_after_op"],
|
||||||
|
)
|
||||||
|
def test_move_shared_zero_copy_lock_fail(
|
||||||
|
started_cluster, test_table, failpoint_lock, failpoint_unlock
|
||||||
|
):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -713,7 +720,9 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
|
|||||||
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
|
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
|
||||||
|
|
||||||
# Try to move and get fail on acquring zero-copy shared lock
|
# Try to move and get fail on acquring zero-copy shared lock
|
||||||
node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}")
|
node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_lock}")
|
||||||
|
if failpoint_unlock:
|
||||||
|
node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint_unlock}")
|
||||||
node1.query_and_get_error(
|
node1.query_and_get_error(
|
||||||
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
||||||
)
|
)
|
||||||
@ -728,7 +737,9 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
|
|||||||
|
|
||||||
# Try another attempt after zk connection is restored
|
# Try another attempt after zk connection is restored
|
||||||
# It should not failed due to leftovers of previous attempt (temporary cloned files)
|
# It should not failed due to leftovers of previous attempt (temporary cloned files)
|
||||||
node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}")
|
node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_lock}")
|
||||||
|
if failpoint_unlock:
|
||||||
|
node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint_unlock}")
|
||||||
node1.query(
|
node1.query(
|
||||||
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
||||||
)
|
)
|
||||||
@ -747,53 +758,3 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
|
|||||||
|
|
||||||
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
||||||
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table):
|
|
||||||
node1 = cluster.instances["node1"]
|
|
||||||
node2 = cluster.instances["node2"]
|
|
||||||
|
|
||||||
node1.query(
|
|
||||||
f"""
|
|
||||||
CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime)
|
|
||||||
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}')
|
|
||||||
ORDER BY date PARTITION BY date
|
|
||||||
SETTINGS storage_policy='hybrid'
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
date = "2024-10-23"
|
|
||||||
node2.query(f"SYSTEM STOP FETCHES {test_table}")
|
|
||||||
|
|
||||||
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
|
|
||||||
# Pause moving after part cloning, but before swapping
|
|
||||||
node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
|
||||||
|
|
||||||
def move(node):
|
|
||||||
node.query_and_get_error(
|
|
||||||
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Start moving
|
|
||||||
t1 = threading.Thread(target=move, args=[node1])
|
|
||||||
t1.start()
|
|
||||||
|
|
||||||
with PartitionManager() as pm:
|
|
||||||
pm.drop_instance_zk_connections(node1)
|
|
||||||
# Continue moving and try to swap
|
|
||||||
node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
|
|
||||||
t1.join()
|
|
||||||
|
|
||||||
# Previous MOVE was failed, try another one after zk connection is restored
|
|
||||||
# It should not failed due to leftovers of previous attempt (temporary cloned files)
|
|
||||||
node1.query_with_retry(
|
|
||||||
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sanity check
|
|
||||||
node2.query(f"SYSTEM START FETCHES {test_table}")
|
|
||||||
wait_for_active_parts(node2, 1, test_table, disk_name="s31")
|
|
||||||
assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n"
|
|
||||||
|
|
||||||
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
|
||||||
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
|
|
||||||
|
Loading…
Reference in New Issue
Block a user