From 57ceeb4a63535b184512ef0bc92d3332dafe6414 Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Tue, 12 Nov 2024 20:38:23 +0000 Subject: [PATCH 1/4] Change acquiring zero-copy shared lock during moving part --- src/Common/FailPoint.cpp | 2 + src/Storages/MergeTree/IMergeTreeDataPart.cpp | 13 +- src/Storages/MergeTree/IMergeTreeDataPart.h | 3 + src/Storages/MergeTree/MergeTreeData.cpp | 60 +++++---- .../MergeTree/MergeTreePartsMover.cpp | 42 ++++++- src/Storages/MergeTree/MergeTreePartsMover.h | 3 + src/Storages/StorageReplicatedMergeTree.cpp | 6 + .../configs/config.d/s3.xml | 6 +- .../test_s3_zero_copy_replication/test.py | 119 +++++++++++++++++- 9 files changed, 215 insertions(+), 39 deletions(-) diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 0898bdded83..49e5dee60dd 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -76,6 +76,8 @@ static struct InitFiu PAUSEABLE(stop_moving_part_before_swap_with_active) \ REGULAR(slowdown_index_analysis) \ REGULAR(replicated_merge_tree_all_replicas_stale) \ + REGULAR(zero_copy_lock_zk_fail_before_op) \ + REGULAR(zero_copy_lock_zk_fail_after_op) \ namespace FailPoints diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 51c445945e6..af08e10b916 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -544,6 +544,14 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n return it == serializations.end() ? nullptr : it->second; } +bool IMergeTreeDataPart::isMovingPart() const +{ + fs::path part_directory_path = getDataPartStorage().getRelativePath(); + if (part_directory_path.filename().empty()) + part_directory_path = part_directory_path.parent_path(); + return part_directory_path.parent_path().filename() == "moving"; +} + void IMergeTreeDataPart::removeIfNeeded() { assert(assertHasValidVersionMetadata()); @@ -568,10 +576,7 @@ void IMergeTreeDataPart::removeIfNeeded() throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set", getDataPartStorage().getPartDirectory(), name); - fs::path part_directory_path = getDataPartStorage().getRelativePath(); - if (part_directory_path.filename().empty()) - part_directory_path = part_directory_path.parent_path(); - bool is_moving_part = part_directory_path.parent_path().filename() == "moving"; + bool is_moving_part = isMovingPart(); if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part) { LOG_ERROR( diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 55f1265318c..e844ea3640e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -434,6 +434,9 @@ public: bool isProjectionPart() const { return parent_part != nullptr; } + /// Check if the part is in the `/moving` directory + bool isMovingPart() const; + const IMergeTreeDataPart * getParentPart() const { return parent_part; } String getParentPartName() const { return parent_part_name; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b2f35d0a309..8806c96ac9b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -8257,33 +8257,49 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & /// replica will actually move the part from disk to some /// zero-copy storage other replicas will just fetch /// metainformation. - if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock) - { - if (lock->isLocked()) - { - cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); - parts_mover.swapClonedPart(cloned_part); - break; - } - if (wait_for_move_if_zero_copy) - { - LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name); - /// Wait and checks not only for timeout but also for shutdown and so on. - while (!waitZeroCopyLockToDisappear(*lock, 3000)) - { - LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name); - } - } - else - break; - } - else + auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); + if (!lock) { /// Move will be retried but with backoff. - LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name); + LOG_DEBUG( + log, + "Move of part {} postponed, because zero copy mode enabled and zero-copy lock was not acquired", + moving_part.part->name); result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy; break; } + + if (lock->isLocked()) + { + cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings); + /// Cloning part can take a long time. + /// Recheck if the lock (and keeper session expirity) is OK + if (lock->isLocked()) + { + parts_mover.swapClonedPart(cloned_part); + break; /// Successfully moved + } + else + { + LOG_DEBUG( + log, + "Move of part {} postponed, because zero copy mode enabled and zero-copy lock was lost during cloning the part", + moving_part.part->name); + result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy; + break; + } + } + if (wait_for_move_if_zero_copy) + { + LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name); + /// Wait and checks not only for timeout but also for shutdown and so on. + while (!waitZeroCopyLockToDisappear(*lock, 3000)) + { + LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name); + } + } + else + break; } } else /// Ordinary move as it should be diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index e9c9f2b4b06..c2a1636513e 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -259,6 +259,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me disk->createDirectories(path_to_clone); + /// TODO: Make it possible to fetch only zero-copy part without fallback to fetching a full-copy one auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name); if (zero_copy_part) @@ -301,6 +302,28 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me return cloned_part; } +void MergeTreePartsMover::renameClonedPart(IMergeTreeDataPart & part) const +try +{ + part.is_temp = false; + /// Mark it DeleteOnDestroy to ensure deleting in destructor + /// if something goes wrong before swapping + part.setState(MergeTreeDataPartState::DeleteOnDestroy); + /// Don't remove new directory but throw an error because it may contain part which is currently in use. + part.renameTo(part.name, /* remove_new_dir_if_exists */ false); +} +catch (...) +{ + /// Check if part was renamed or not + /// `renameTo()` does not provide strong exception guarantee in case of an exception + if (part.isMovingPart()) + { + /// Restore its temporary state + part.is_temp = true; + part.setState(MergeTreeDataPartState::Temporary); + } + throw; +} void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const { @@ -327,12 +350,23 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons return; } - cloned_part.part->is_temp = false; + /// It is safe to acquire zero-copy lock for the temporary part here + /// because no one can fetch it until it is *swapped*. + /// + /// Set ASK_KEEPER to try to unlock it in destructor if something goes wrong before *renaming* + /// If unlocking is failed we will not get a stucked part in moving directory + /// because it will be renamed to delete_tmp_ beforehand and cleaned up later. + /// Worst outcomes: trash in object storage and/or orphaned shared zero-copy lock. It is acceptable. + /// See DataPartStorageOnDiskBase::remove(). + cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER; + data->lockSharedData(*cloned_part.part, /* replace_existing_lock = */ true); - /// Don't remove new directory but throw an error because it may contain part which is currently in use. - cloned_part.part->renameTo(active_part->name, false); + renameClonedPart(*cloned_part.part); - /// TODO what happen if server goes down here? + /// If server goes down here we will get two copy of the part with the same name on different disks. + /// And on the next ClickHouse startup during loading parts the first copy (in the order of defining disks + /// in the storage policy) will be loaded as Active, the second one will be loaded as Outdated and removed as duplicate. + /// See MergeTreeData::loadDataParts(). data->swapActivePart(cloned_part.part, part_lock); LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath()); diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 3cf270946d8..7a6583008c0 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -75,6 +75,9 @@ public: /// merge or mutation. void swapClonedPart(TemporaryClonedPart & cloned_part) const; + /// Rename cloned part from `moving/` directory to the actual part storage + void renameClonedPart(IMergeTreeDataPart & part) const; + /// Can stop background moves and moves from queries ActionBlocker moves_blocker; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 793fd02c656..c0c637b6d77 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -216,6 +216,8 @@ namespace FailPoints extern const char replicated_queue_fail_next_entry[]; extern const char replicated_queue_unfail_entries[]; extern const char finish_set_quorum_failed_parts[]; + extern const char zero_copy_lock_zk_fail_before_op[]; + extern const char zero_copy_lock_zk_fail_after_op[]; } namespace ErrorCodes @@ -10483,6 +10485,10 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode( Coordination::Requests ops; Coordination::Responses responses; getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files); + + fiu_do_on(FailPoints::zero_copy_lock_zk_fail_before_op, { zookeeper->forceFailureBeforeOperation(); }); + fiu_do_on(FailPoints::zero_copy_lock_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); }); + auto error = zookeeper->tryMulti(ops, responses); if (error == Coordination::Error::ZOK) { diff --git a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml index 8df9e8e8c26..3a2bec7f314 100644 --- a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml +++ b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml @@ -7,21 +7,21 @@ http://minio1:9001/root/data/ minio minio123 - true + false s3 http://minio1:9001/root/data/ minio minio123 - true + false s3 http://minio1:9001/root/data2/ minio minio123 - true + false diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index e723a4ffc57..72593c9a2ff 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -1,10 +1,13 @@ import datetime import logging +import threading import time import pytest from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager + logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) @@ -77,15 +80,15 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30): assert get_large_objects_count(cluster, size=size) == expected -def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30): +def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30, disk_name = None): deadline = time.monotonic() + timeout num_parts = 0 while time.monotonic() < deadline: - num_parts_str = node.query( - "select count() from system.parts where table = '{}' and active".format( - table_name - ) - ) + query = f"select count() from system.parts where table = '{table_name}' and active" + if disk_name: + query += f" and disk_name='{disk_name}'" + + num_parts_str = node.query(query) num_parts = int(num_parts_str.strip()) if num_parts == num_expected_parts: return @@ -95,6 +98,22 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30): assert num_parts == num_expected_parts +@pytest.fixture(scope="function") +def test_name(request): + return request.node.name + + +@pytest.fixture(scope="function") +def test_table(test_name): + normalized = ( + test_name.replace("[", "_") + .replace("]", "_") + .replace(" ", "_") + .replace("-", "_") + ) + return "table_" + normalized + + # Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning @pytest.mark.order(0) @pytest.mark.parametrize("policy", ["s3"]) @@ -668,3 +687,91 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): time.sleep(10) check_objects_not_exisis(cluster, objectsY) + + +@pytest.mark.parametrize("failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"]) +def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + node1.query( + f""" + CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') + ORDER BY date PARTITION BY date + SETTINGS storage_policy='hybrid' + """ + ) + + date = '2024-10-23' + + node2.query(f"SYSTEM STOP FETCHES {test_table}") + node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") + + # Try to move and get fail on acquring zero-copy shared lock + node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}") + node1.query_and_get_error(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + + # After fail the part must remain on the source disk + assert node1.query(f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name") == "default\n" + + # Try another attempt after zk connection is restored + # It should not failed due to leftovers of previous attempt (temporary cloned files) + node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}") + node1.query(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + + assert node1.query(f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name") == "s31\n" + + # Sanity check + node2.query(f"SYSTEM START FETCHES {test_table}") + wait_for_active_parts(node2, 1, test_table, disk_name='s31') + assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" + + node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + + +def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + node1.query( + f""" + CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}') + ORDER BY date PARTITION BY date + SETTINGS storage_policy='hybrid' + """ + ) + + date = '2024-10-23' + node2.query(f"SYSTEM STOP FETCHES {test_table}") + + node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") + # Pause moving after part cloning, but before swapping + node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") + + def move(node): + node.query_and_get_error(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + + # Start moving + t1 = threading.Thread(target=move, args=[node1]) + t1.start() + + with PartitionManager() as pm: + pm.drop_instance_zk_connections(node1) + # Continue moving and try to swap + node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active") + t1.join() + + # Previous MOVE was failed, try another one after zk connection is restored + # It should not failed due to leftovers of previous attempt (temporary cloned files) + node1.query_with_retry(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + + # Sanity check + node2.query(f"SYSTEM START FETCHES {test_table}") + wait_for_active_parts(node2, 1, test_table, disk_name='s31') + assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" + + node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") + node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC") From 2aa5447f090a55f3dd0b818a2adbaf874a2ba3bc Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Wed, 13 Nov 2024 11:02:30 +0000 Subject: [PATCH 2/4] Fix style for tests --- .../test_s3_zero_copy_replication/test.py | 52 +++++++++++++------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 72593c9a2ff..4996bf8d79d 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -689,10 +689,12 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): check_objects_not_exisis(cluster, objectsY) -@pytest.mark.parametrize("failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"]) +@pytest.mark.parametrize( + "failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"] +) def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): node1 = cluster.instances["node1"] - node2 = cluster.instances["node2"] + node2 = cluster.instances["node2"] node1.query( f""" @@ -703,28 +705,42 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): """ ) - date = '2024-10-23' + date = "2024-10-23" - node2.query(f"SYSTEM STOP FETCHES {test_table}") + node2.query(f"SYSTEM STOP FETCHES {test_table}") node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") # Try to move and get fail on acquring zero-copy shared lock node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}") - node1.query_and_get_error(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + node1.query_and_get_error( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) # After fail the part must remain on the source disk - assert node1.query(f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name") == "default\n" + assert ( + node1.query( + f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" + ) + == "default\n" + ) # Try another attempt after zk connection is restored # It should not failed due to leftovers of previous attempt (temporary cloned files) node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}") - node1.query(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + node1.query( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) - assert node1.query(f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name") == "s31\n" + assert ( + node1.query( + f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name" + ) + == "s31\n" + ) # Sanity check node2.query(f"SYSTEM START FETCHES {test_table}") - wait_for_active_parts(node2, 1, test_table, disk_name='s31') + wait_for_active_parts(node2, 1, test_table, disk_name="s31") assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") @@ -733,7 +749,7 @@ def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint): def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): node1 = cluster.instances["node1"] - node2 = cluster.instances["node2"] + node2 = cluster.instances["node2"] node1.query( f""" @@ -744,15 +760,17 @@ def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): """ ) - date = '2024-10-23' + date = "2024-10-23" node2.query(f"SYSTEM STOP FETCHES {test_table}") node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')") # Pause moving after part cloning, but before swapping - node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") + node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active") def move(node): - node.query_and_get_error(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + node.query_and_get_error( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) # Start moving t1 = threading.Thread(target=move, args=[node1]) @@ -763,14 +781,16 @@ def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table): # Continue moving and try to swap node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active") t1.join() - + # Previous MOVE was failed, try another one after zk connection is restored # It should not failed due to leftovers of previous attempt (temporary cloned files) - node1.query_with_retry(f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'") + node1.query_with_retry( + f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'" + ) # Sanity check node2.query(f"SYSTEM START FETCHES {test_table}") - wait_for_active_parts(node2, 1, test_table, disk_name='s31') + wait_for_active_parts(node2, 1, test_table, disk_name="s31") assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n" node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC") From 41415197ab88d72470571701a591b8499aaa68a1 Mon Sep 17 00:00:00 2001 From: Vladimir Cherkasov Date: Wed, 13 Nov 2024 12:28:46 +0100 Subject: [PATCH 3/4] stylecheck --- src/Storages/MergeTree/MergeTreePartsMover.cpp | 2 +- tests/integration/test_s3_zero_copy_replication/test.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index c2a1636513e..82eafd47cb8 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -354,7 +354,7 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons /// because no one can fetch it until it is *swapped*. /// /// Set ASK_KEEPER to try to unlock it in destructor if something goes wrong before *renaming* - /// If unlocking is failed we will not get a stucked part in moving directory + /// If unlocking is failed we will not get a stuck part in moving directory /// because it will be renamed to delete_tmp_ beforehand and cleaned up later. /// Worst outcomes: trash in object storage and/or orphaned shared zero-copy lock. It is acceptable. /// See DataPartStorageOnDiskBase::remove(). diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 4996bf8d79d..bb0c93d8410 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -8,7 +8,6 @@ import pytest from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager - logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) @@ -80,11 +79,15 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30): assert get_large_objects_count(cluster, size=size) == expected -def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30, disk_name = None): +def wait_for_active_parts( + node, num_expected_parts, table_name, timeout=30, disk_name = None +): deadline = time.monotonic() + timeout num_parts = 0 while time.monotonic() < deadline: - query = f"select count() from system.parts where table = '{table_name}' and active" + query = ( + f"select count() from system.parts where table = '{table_name}' and active" + ) if disk_name: query += f" and disk_name='{disk_name}'" From eb384d489c7e2b1845b182d5871fba6b0b311381 Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Wed, 13 Nov 2024 11:51:42 +0000 Subject: [PATCH 4/4] Fix style for tests 2 --- tests/integration/test_s3_zero_copy_replication/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index bb0c93d8410..c7d03d4301d 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -80,7 +80,7 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30): def wait_for_active_parts( - node, num_expected_parts, table_name, timeout=30, disk_name = None + node, num_expected_parts, table_name, timeout=30, disk_name=None ): deadline = time.monotonic() + timeout num_parts = 0