Merge pull request #71845 from aalexfvk/acquire_zero_copy_shared_lock_before_swap

Acquire zero-copy shared lock before moving a part
This commit is contained in:
Vladimir Cherkasov 2024-11-19 19:38:54 +00:00 committed by GitHub
commit 456a41ee42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 237 additions and 38 deletions

View File

@ -75,6 +75,8 @@ static struct InitFiu
PAUSEABLE(stop_moving_part_before_swap_with_active) \
REGULAR(slowdown_index_analysis) \
REGULAR(replicated_merge_tree_all_replicas_stale) \
REGULAR(zero_copy_lock_zk_fail_before_op) \
REGULAR(zero_copy_lock_zk_fail_after_op) \
namespace FailPoints

View File

@ -545,6 +545,14 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n
return it == serializations.end() ? nullptr : it->second;
}
bool IMergeTreeDataPart::isMovingPart() const
{
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
return part_directory_path.parent_path().filename() == "moving";
}
void IMergeTreeDataPart::removeIfNeeded() noexcept
{
assert(assertHasValidVersionMetadata());
@ -569,10 +577,7 @@ void IMergeTreeDataPart::removeIfNeeded() noexcept
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
bool is_moving_part = part_directory_path.parent_path().filename() == "moving";
bool is_moving_part = isMovingPart();
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(

View File

@ -434,6 +434,9 @@ public:
bool isProjectionPart() const { return parent_part != nullptr; }
/// Check if the part is in the `/moving` directory
bool isMovingPart() const;
const IMergeTreeDataPart * getParentPart() const { return parent_part; }
String getParentPartName() const { return parent_part_name; }

View File

@ -8258,33 +8258,49 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
parts_mover.swapClonedPart(cloned_part);
break;
}
if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
else
auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk);
if (!lock)
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
LOG_DEBUG(
log,
"Move of part {} postponed, because zero copy mode enabled and zero-copy lock was not acquired",
moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
/// Cloning part can take a long time.
/// Recheck if the lock (and keeper session expirity) is OK
if (lock->isLocked())
{
parts_mover.swapClonedPart(cloned_part);
break; /// Successfully moved
}
else
{
LOG_DEBUG(
log,
"Move of part {} postponed, because zero copy mode enabled and zero-copy lock was lost during cloning the part",
moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
}
if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
}
else /// Ordinary move as it should be

View File

@ -259,6 +259,7 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
disk->createDirectories(path_to_clone);
/// TODO: Make it possible to fetch only zero-copy part without fallback to fetching a full-copy one
auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
if (zero_copy_part)
@ -301,6 +302,28 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
return cloned_part;
}
void MergeTreePartsMover::renameClonedPart(IMergeTreeDataPart & part) const
try
{
part.is_temp = false;
/// Mark it DeleteOnDestroy to ensure deleting in destructor
/// if something goes wrong before swapping
part.setState(MergeTreeDataPartState::DeleteOnDestroy);
/// Don't remove new directory but throw an error because it may contain part which is currently in use.
part.renameTo(part.name, /* remove_new_dir_if_exists */ false);
}
catch (...)
{
/// Check if part was renamed or not
/// `renameTo()` does not provide strong exception guarantee in case of an exception
if (part.isMovingPart())
{
/// Restore its temporary state
part.is_temp = true;
part.setState(MergeTreeDataPartState::Temporary);
}
throw;
}
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
@ -327,12 +350,23 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons
return;
}
cloned_part.part->is_temp = false;
/// It is safe to acquire zero-copy lock for the temporary part here
/// because no one can fetch it until it is *swapped*.
///
/// Set ASK_KEEPER to try to unlock it in destructor if something goes wrong before *renaming*
/// If unlocking is failed we will not get a stuck part in moving directory
/// because it will be renamed to delete_tmp_<name> beforehand and cleaned up later.
/// Worst outcomes: trash in object storage and/or orphaned shared zero-copy lock. It is acceptable.
/// See DataPartStorageOnDiskBase::remove().
cloned_part.part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
data->lockSharedData(*cloned_part.part, /* replace_existing_lock = */ true);
/// Don't remove new directory but throw an error because it may contain part which is currently in use.
cloned_part.part->renameTo(active_part->name, false);
renameClonedPart(*cloned_part.part);
/// TODO what happen if server goes down here?
/// If server goes down here we will get two copy of the part with the same name on different disks.
/// And on the next ClickHouse startup during loading parts the first copy (in the order of defining disks
/// in the storage policy) will be loaded as Active, the second one will be loaded as Outdated and removed as duplicate.
/// See MergeTreeData::loadDataParts().
data->swapActivePart(cloned_part.part, part_lock);
LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());

View File

@ -75,6 +75,9 @@ public:
/// merge or mutation.
void swapClonedPart(TemporaryClonedPart & cloned_part) const;
/// Rename cloned part from `moving/` directory to the actual part storage
void renameClonedPart(IMergeTreeDataPart & part) const;
/// Can stop background moves and moves from queries
ActionBlocker moves_blocker;

View File

@ -216,6 +216,8 @@ namespace FailPoints
extern const char replicated_queue_fail_next_entry[];
extern const char replicated_queue_unfail_entries[];
extern const char finish_set_quorum_failed_parts[];
extern const char zero_copy_lock_zk_fail_before_op[];
extern const char zero_copy_lock_zk_fail_after_op[];
}
namespace ErrorCodes
@ -10480,6 +10482,10 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
Coordination::Requests ops;
Coordination::Responses responses;
getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
fiu_do_on(FailPoints::zero_copy_lock_zk_fail_before_op, { zookeeper->forceFailureBeforeOperation(); });
fiu_do_on(FailPoints::zero_copy_lock_zk_fail_after_op, { zookeeper->forceFailureAfterOperation(); });
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
{

View File

@ -7,21 +7,21 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s31>
<s31_again>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s31_again>
<s32>
<type>s3</type>
<endpoint>http://minio1:9001/root/data2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<send_metadata>false</send_metadata>
</s32>
</disks>
<policies>

View File

@ -1,10 +1,12 @@
import datetime
import logging
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@ -77,15 +79,19 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
assert get_large_objects_count(cluster, size=size) == expected
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
def wait_for_active_parts(
node, num_expected_parts, table_name, timeout=30, disk_name=None
):
deadline = time.monotonic() + timeout
num_parts = 0
while time.monotonic() < deadline:
num_parts_str = node.query(
"select count() from system.parts where table = '{}' and active".format(
table_name
)
query = (
f"select count() from system.parts where table = '{table_name}' and active"
)
if disk_name:
query += f" and disk_name='{disk_name}'"
num_parts_str = node.query(query)
num_parts = int(num_parts_str.strip())
if num_parts == num_expected_parts:
return
@ -95,6 +101,22 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
assert num_parts == num_expected_parts
@pytest.fixture(scope="function")
def test_name(request):
return request.node.name
@pytest.fixture(scope="function")
def test_table(test_name):
normalized = (
test_name.replace("[", "_")
.replace("]", "_")
.replace(" ", "_")
.replace("-", "_")
)
return "table_" + normalized
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
@pytest.mark.order(0)
@pytest.mark.parametrize("policy", ["s3"])
@ -668,3 +690,111 @@ def test_s3_zero_copy_keeps_data_after_mutation(started_cluster):
time.sleep(10)
check_objects_not_exisis(cluster, objectsY)
@pytest.mark.parametrize(
"failpoint", ["zero_copy_lock_zk_fail_before_op", "zero_copy_lock_zk_fail_after_op"]
)
def test_move_shared_lock_fail_once(started_cluster, test_table, failpoint):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
f"""
CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}')
ORDER BY date PARTITION BY date
SETTINGS storage_policy='hybrid'
"""
)
date = "2024-10-23"
node2.query(f"SYSTEM STOP FETCHES {test_table}")
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
# Try to move and get fail on acquring zero-copy shared lock
node1.query(f"SYSTEM ENABLE FAILPOINT {failpoint}")
node1.query_and_get_error(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# After fail the part must remain on the source disk
assert (
node1.query(
f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name"
)
== "default\n"
)
# Try another attempt after zk connection is restored
# It should not failed due to leftovers of previous attempt (temporary cloned files)
node1.query(f"SYSTEM DISABLE FAILPOINT {failpoint}")
node1.query(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
assert (
node1.query(
f"SELECT disk_name FROM system.parts WHERE table='{test_table}' GROUP BY disk_name"
)
== "s31\n"
)
# Sanity check
node2.query(f"SYSTEM START FETCHES {test_table}")
wait_for_active_parts(node2, 1, test_table, disk_name="s31")
assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n"
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
def test_move_shared_lock_fail_keeper_unavailable(started_cluster, test_table):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
f"""
CREATE TABLE {test_table} ON CLUSTER test_cluster (num UInt64, date DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{test_table}', '{{replica}}')
ORDER BY date PARTITION BY date
SETTINGS storage_policy='hybrid'
"""
)
date = "2024-10-23"
node2.query(f"SYSTEM STOP FETCHES {test_table}")
node1.query(f"INSERT INTO {test_table} VALUES (1, '{date}')")
# Pause moving after part cloning, but before swapping
node1.query("SYSTEM ENABLE FAILPOINT stop_moving_part_before_swap_with_active")
def move(node):
node.query_and_get_error(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# Start moving
t1 = threading.Thread(target=move, args=[node1])
t1.start()
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node1)
# Continue moving and try to swap
node1.query("SYSTEM DISABLE FAILPOINT stop_moving_part_before_swap_with_active")
t1.join()
# Previous MOVE was failed, try another one after zk connection is restored
# It should not failed due to leftovers of previous attempt (temporary cloned files)
node1.query_with_retry(
f"ALTER TABLE {test_table} MOVE PARTITION '{date}' TO VOLUME 'external'"
)
# Sanity check
node2.query(f"SYSTEM START FETCHES {test_table}")
wait_for_active_parts(node2, 1, test_table, disk_name="s31")
assert node2.query(f"SELECT sum(num) FROM {test_table}") == "1\n"
node1.query(f"DROP TABLE IF EXISTS {test_table} SYNC")
node2.query(f"DROP TABLE IF EXISTS {test_table} SYNC")