From 13f8d72f54433a790f3efcb054db389e4fdd53f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Thu, 20 Jul 2023 17:46:22 +0200 Subject: [PATCH] Wait for zero copy replication lock even if some disks don't support it --- .../MergeTree/MergeFromLogEntryTask.cpp | 8 +++++-- .../MergeTree/MutateFromLogEntryTask.cpp | 6 ++++- .../ReplicatedMergeMutateTaskBase.cpp | 2 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 22 +++++++++++++++---- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index 9f54c554c85..883cfee89c8 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -230,7 +230,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() /// the fast replica is not overloaded because amount of executing merges doesn't affect the ability to acquire locks for new merges. /// /// So here we trying to solve it with the simplest solution -- sleep random time up to 500ms for 1GB part and up to 7 seconds for 300GB part. - /// It can sound too much, but we are trying to aquite these locks in background tasks which can be scheduled each 5 seconds or so. + /// It can sound too much, but we are trying to acquire these locks in background tasks which can be scheduled each 5 seconds or so. double start_to_sleep_seconds = std::logf(storage_settings_ptr->zero_copy_merge_mutation_min_parts_size_sleep_before_lock.value); uint64_t right_border_to_sleep_ms = static_cast((std::log(estimated_space_for_merge) - start_to_sleep_seconds + 0.5) * 1000); uint64_t time_to_sleep_milliseconds = std::min(10000UL, std::uniform_int_distribution(1, 1 + right_border_to_sleep_ms)(rng)); @@ -245,7 +245,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() if (!zero_copy_lock || !zero_copy_lock->isLocked()) { - LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name); + LOG_DEBUG( + log, + "Merge of part {} started by some other replica, will wait for it and fetch merged part. Number of tries {}", + entry.new_part_name, + entry.num_tries); storage.watchZeroCopyLock(entry.new_part_name, disk); /// Don't check for missing part -- it's missing because other replica still not /// finished merge. diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 6cb9d50436e..164b541d2b8 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -154,8 +154,12 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() if (!zero_copy_lock || !zero_copy_lock->isLocked()) { + LOG_DEBUG( + log, + "Mutation of part {} started by some other replica, will wait for it and mutated merged part. Number of tries {}", + entry.new_part_name, + entry.num_tries); storage.watchZeroCopyLock(entry.new_part_name, disk); - LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name); return PrepareResult{ .prepared_successfully = false, diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp index b4748ee77ea..6ad77119016 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp @@ -174,7 +174,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl() part_log_writer = prepare_result.part_log_writer; - /// Avoid resheduling, execute fetch here, in the same thread. + /// Avoid rescheduling, execute fetch here, in the same thread. if (!prepare_result.prepared_successfully) return execute_fetch(prepare_result.need_to_check_missing_part_in_fetch); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 07f46c07466..3264de850a0 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1370,13 +1370,27 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (data_settings->allow_remote_fs_zero_copy_replication) { auto disks = storage.getDisks(); - bool only_s3_storage = true; + DiskPtr disk_with_zero_copy = nullptr; for (const auto & disk : disks) - if (!disk->supportZeroCopyReplication()) - only_s3_storage = false; + { + if (disk->supportZeroCopyReplication()) + { + disk_with_zero_copy = disk; + break; + } + } + /// Technically speaking if there are more than one disk that could store the part (a local hot + cloud cold) + /// It would be possible for the merge to happen concurrently with other replica if the other replica is doing + /// a merge using zero-copy and the cloud storage, and the local replica uses the local storage instead + /// The question is, is it worth keep retrying to do the merge over and over for the opportunity to do + /// double the work? Probably not + /// So what we do is that, even if hot merge could happen, check the zero copy lock anyway. + /// Keep in mind that for the zero copy lock check to happen (via existing_zero_copy_locks) we need to + /// have failed first because of it and added it via watchZeroCopyLock. Considering we've already tried to + /// use cloud storage and zero-copy replication, the most likely scenario is that we'll try again String replica_to_execute_merge; - if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0], replica_to_execute_merge)) + if (disk_with_zero_copy && storage.checkZeroCopyLockExists(entry.new_part_name, disk_with_zero_copy, replica_to_execute_merge)) { constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting for {} to execute it and will fetch after."; out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge);