diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index bc4f2cb43d2..24ac4d8378d 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -64,6 +64,10 @@ static struct InitFiu REGULAR(lazy_pipe_fds_fail_close) \ PAUSEABLE(infinite_sleep) \ PAUSEABLE(stop_moving_part_before_swap_with_active) \ + REGULAR(slowdown_index_analysis) \ + REGULAR(replicated_merge_tree_all_replicas_stale) \ + REGULAR(zero_copy_lock_zk_fail_before_op) \ + REGULAR(zero_copy_lock_zk_fail_after_op) \ namespace FailPoints diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index ceec13fc4a4..faa5e3a6f1f 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -524,6 +524,14 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n return it == serializations.end() ? nullptr : it->second; } +bool IMergeTreeDataPart::isMovingPart() const +{ + fs::path part_directory_path = getDataPartStorage().getRelativePath(); + if (part_directory_path.filename().empty()) + part_directory_path = part_directory_path.parent_path(); + return part_directory_path.parent_path().filename() == "moving"; +} + void IMergeTreeDataPart::removeIfNeeded() { assert(assertHasValidVersionMetadata()); @@ -548,10 +556,7 @@ void IMergeTreeDataPart::removeIfNeeded() throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", getDataPartStorage().getPartDirectory(), name); - fs::path part_directory_path = getDataPartStorage().getRelativePath(); - if (part_directory_path.filename().empty()) - part_directory_path = part_directory_path.parent_path(); - bool is_moving_part = part_directory_path.parent_path().filename() == "moving"; + bool is_moving_part = isMovingPart(); if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part) { LOG_ERROR( diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 85ef0472ce7..07050697f4d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -429,6 +429,9 @@ public: bool isProjectionPart() const { return parent_part != nullptr; } + /// Check if the part is in the `/moving` directory + bool isMovingPart() const; + const IMergeTreeDataPart * getParentPart() const { return parent_part; } String getParentPartName() const { return parent_part_name; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a4fb314d5d9..7670c88b94f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -7998,33 +7998,49 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & /// replica will actually move the part from disk to some /// zero-copy storage other replicas will just fetch /// metainformation. - if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock) - { - if (lock->isLocked()) - { - cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); - parts_mover.swapClonedPart(cloned_part); - break; - } - else if (wait_for_move_if_zero_copy) - { - LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name); - /// Wait and checks not only for timeout but also for shutdown and so on. - while (!waitZeroCopyLockToDisappear(*lock, 3000)) - { - LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name); - } - } - else - break; - } - else + auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); + if (!lock) { /// Move will be retried but with backoff. - LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name); + LOG_DEBUG( + log, + "Move of part {} postponed, because zero copy mode enabled and zero-copy lock was not acquired", + moving_part.part->name); result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy; break; } + + if (lock->isLocked()) + { + cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); + /// Cloning part can take a long time. + /// Recheck if the lock (and keeper session expirity) is OK + if (lock->isLocked()) + { + parts_mover.swapClonedPart(cloned_part); + break; /// Successfully moved + } + else + { + LOG_DEBUG( + log, + "Move of part {} postponed, because zero copy mode enabled and zero-copy lock was lost during cloning the part", + moving_part.part->name); + result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy; + break; + } + } + if (wait_for_move_if_zero_copy) + { + LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name); + /// Wait and checks not only for timeout but also for shutdown and so on. + while (!waitZeroCopyLockToDisappear(*lock, 3000)) + { + LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name); + } + } + else + break; } } else /// Ordinary move as it should be diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index d81300da738..528e2a9d55a 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -255,6 +255,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me disk->createDirectories(path_to_clone); + /// TODO: Make it possible to fetch only zero-copy part without fallback to fetching a full-copy one auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name); if (zero_copy_part) @@ -297,6 +298,28 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me return cloned_part; } +void MergeTreePartsMover::renameClonedPart(IMergeTreeDataPart & part) const +try +{ + part.is_temp = false; + /// Mark it DeleteOnDestroy to ensure deleting in destructor + /// if something goes wrong before swapping + part.setState(MergeTreeDataPartState::DeleteOnDestroy); + /// Don't remove new directory but throw an error because it may contain part which is currently in use. + part.renameTo(part.name, /* remove_new_dir_if_exists */ false); +} +catch (...) +{ + /// Check if part was renamed or not + /// `renameTo()` does not provide strong exception guarantee in case of an exception + if (part.isMovingPart()) + { + /// Restore its temporary state + part.is_temp = true; + part.setState(MergeTreeDataPartState::Temporary); + } + throw; +} void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const { @@ -323,12 +346,23 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons return; } - cloned_part.part->is_temp = false; + /// It is safe to acquire zero-copy lock for the temporary part here + /// because no one can fetch it until it is *swapped*. + /// + /// Set ASK_KEEPER to try to unlock it in destructor if something goes wrong before *renaming* + /// If unlocking is failed we will not get a stuck part in moving directory + /// because it will be renamed to delete_tmp_ beforehand and cleaned up later. + /// Worst outcomes: trash in object storage and/or orphaned shared zero-copy lock. It is acceptable. + /// See DataPartStorageOnDiskBase::remove(). + cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER; + data->lockSharedData(*cloned_part.part, /* replace_existing_lock = */ true); - /// Don't remove new directory but throw an error because it may contain part which is currently in use. - cloned_part.part->renameTo(active_part->name, false); + renameClonedPart(*cloned_part.part); - /// TODO what happen if server goes down here? + /// If server goes down here we will get two copy of the part with the same name on different disks. + /// And on the next ClickHouse startup during loading parts the first copy (in the order of defining disks + /// in the storage policy) will be loaded as Active, the second one will be loaded as Outdated and removed as duplicate. + /// See MergeTreeData::loadDataParts(). data->swapActivePart(cloned_part.part, part_lock); LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath()); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 3cf270946d8..7a6583008c0 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -75,6 +75,9 @@ public: /// merge or mutation. void swapClonedPart(TemporaryClonedPart & cloned_part) const; + /// Rename cloned part from `moving/` directory to the actual part storage + void renameClonedPart(IMergeTreeDataPart & part) const; + /// Can stop background moves and moves from queries ActionBlocker moves_blocker; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c894818708d..3823cea4025 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -147,6 +147,8 @@ namespace FailPoints extern const char replicated_queue_fail_next_entry[]; extern const char replicated_queue_unfail_entries[]; extern const char finish_set_quorum_failed_parts[]; + extern const char zero_copy_lock_zk_fail_before_op[]; + extern const char zero_copy_lock_zk_fail_after_op[]; } namespace ErrorCodes @@ -10333,6 +10335,10 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode( Coordination::Requests ops; Coordination::Responses responses; getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files); + + fiu_do_on(FailPoints::zero_copy_lock_zk_fail_before_op, { zookeeper->forceFailureBeforeOperation(); }); + fiu_do_on(FailPoints::zero_copy_lock_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); }); + auto error = zookeeper->tryMulti(ops, responses); if (error == Coordination::Error::ZOK) { diff --git a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml index 8df9e8e8c26..3a2bec7f314 100644 --- a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml +++ b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml @@ -7,21 +7,21 @@ http://minio1:9001/root/data/ minio minio123 - true + false s3 http://minio1:9001/root/data/ minio minio123 - true + false s3 http://minio1:9001/root/data2/ minio minio123 - true + false diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 0ca81a27802..913da707c18 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -1,9 +1,11 @@ import datetime import logging +import threading import time import pytest from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) @@ -76,15 +78,19 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30): assert get_large_objects_count(cluster, size=size) == expected -def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30): +def wait_for_active_parts( + node, num_expected_parts, table_name, timeout=30, disk_name=None +): deadline = time.monotonic() + timeout num_parts = 0 while time.monotonic() < deadline: - num_parts_str = node.query( - "select count() from system.parts where table = '{}' and active".format( - table_name - ) + query = ( + f"select count() from system.parts where table = '{table_name}' and active" ) + if disk_name: + query += f" and disk_name='{disk_name}'" + + num_parts_str = node.query(query) num_parts = int(num_parts_str.strip()) if num_parts == num_expected_parts: return @@ -94,6 +100,22 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30): assert num_parts == num_expected_parts +@pytest.fixture(scope="function") +def test_name(request): + return request.node.name + + +@pytest.fixture(scope="function") +def test_table(test_name): + normalized = ( + test_name.replace("[", "_") + .replace("]", "_") + .replace(" ", "_") + .replace("-", "_") + ) + return "table_" + normalized + + # Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning @pytest.mark.order(0) @pytest.mark.parametrize("policy", ["s3"]) @@ -667,3 +689,111 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): time.sleep(10) check_objects_not_exisis(cluster, objectsY) + + +@pytest.mark.parametrize( + "failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"] +) +def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + node1.query( + f""" + CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') + ORDER BY date PARTITION BY date + SETTINGS storage_policy='hybrid' + """ + ) + + date = "2024-10-23" + + node2.query(f"SYSTEM STOP FETCHES {test_table}") + node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") + + # Try to move and get fail on acquring zero-copy shared lock + node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}") + node1.query_and_get_error( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) + + # After fail the part must remain on the source disk + assert ( + node1.query( + f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" + ) + == "default\n" + ) + + # Try another attempt after zk connection is restored + # It should not failed due to leftovers of previous attempt (temporary cloned files) + node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}") + node1.query( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) + + assert ( + node1.query( + f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" + ) + == "s31\n" + ) + + # Sanity check + node2.query(f"SYSTEM START FETCHES {test_table}") + wait_for_active_parts(node2, 1, test_table, disk_name="s31") + assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" + + node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + + +def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + node1.query( + f""" + CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') + ORDER BY date PARTITION BY date + SETTINGS storage_policy='hybrid' + """ + ) + + date = "2024-10-23" + node2.query(f"SYSTEM STOP FETCHES {test_table}") + + node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") + # Pause moving after part cloning, but before swapping + node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") + + def move(node): + node.query_and_get_error( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) + + # Start moving + t1 = threading.Thread(target=move, args=[node1]) + t1.start() + + with PartitionManager() as pm: + pm.drop_instance_zk_connections(node1) + # Continue moving and try to swap + node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active") + t1.join() + + # Previous MOVE was failed, try another one after zk connection is restored + # It should not failed due to leftovers of previous attempt (temporary cloned files) + node1.query_with_retry( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) + + # Sanity check + node2.query(f"SYSTEM START FETCHES {test_table}") + wait_for_active_parts(node2, 1, test_table, disk_name="s31") + assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" + + node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")