Wait for zero copy replication lock even if some disks don't support it

This commit is contained in:
Raúl Marín 2023-07-20 17:46:22 +02:00
parent a39ba00ec3
commit 13f8d72f54
4 changed files with 30 additions and 8 deletions

View File

@ -230,7 +230,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
/// the fast replica is not overloaded because amount of executing merges doesn't affect the ability to acquire locks for new merges.
///
/// So here we trying to solve it with the simplest solution -- sleep random time up to 500ms for 1GB part and up to 7 seconds for 300GB part.
/// It can sound too much, but we are trying to aquite these locks in background tasks which can be scheduled each 5 seconds or so.
/// It can sound too much, but we are trying to acquire these locks in background tasks which can be scheduled each 5 seconds or so.
double start_to_sleep_seconds = std::logf(storage_settings_ptr->zero_copy_merge_mutation_min_parts_size_sleep_before_lock.value);
uint64_t right_border_to_sleep_ms = static_cast<uint64_t>((std::log(estimated_space_for_merge) - start_to_sleep_seconds + 0.5) * 1000);
uint64_t time_to_sleep_milliseconds = std::min<uint64_t>(10000UL, std::uniform_int_distribution<uint64_t>(1, 1 + right_border_to_sleep_ms)(rng));
@ -245,7 +245,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
LOG_DEBUG(
log,
"Merge of part {} started by some other replica, will wait for it and fetch merged part. Number of tries {}",
entry.new_part_name,
entry.num_tries);
storage.watchZeroCopyLock(entry.new_part_name, disk);
/// Don't check for missing part -- it's missing because other replica still not
/// finished merge.

View File

@ -154,8 +154,12 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
LOG_DEBUG(
log,
"Mutation of part {} started by some other replica, will wait for it and mutated merged part. Number of tries {}",
entry.new_part_name,
entry.num_tries);
storage.watchZeroCopyLock(entry.new_part_name, disk);
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,

View File

@ -174,7 +174,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
part_log_writer = prepare_result.part_log_writer;
/// Avoid resheduling, execute fetch here, in the same thread.
/// Avoid rescheduling, execute fetch here, in the same thread.
if (!prepare_result.prepared_successfully)
return execute_fetch(prepare_result.need_to_check_missing_part_in_fetch);

View File

@ -1370,13 +1370,27 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (data_settings->allow_remote_fs_zero_copy_replication)
{
auto disks = storage.getDisks();
bool only_s3_storage = true;
DiskPtr disk_with_zero_copy = nullptr;
for (const auto & disk : disks)
if (!disk->supportZeroCopyReplication())
only_s3_storage = false;
{
if (disk->supportZeroCopyReplication())
{
disk_with_zero_copy = disk;
break;
}
}
/// Technically speaking if there are more than one disk that could store the part (a local hot + cloud cold)
/// It would be possible for the merge to happen concurrently with other replica if the other replica is doing
/// a merge using zero-copy and the cloud storage, and the local replica uses the local storage instead
/// The question is, is it worth keep retrying to do the merge over and over for the opportunity to do
/// double the work? Probably not
/// So what we do is that, even if hot merge could happen, check the zero copy lock anyway.
/// Keep in mind that for the zero copy lock check to happen (via existing_zero_copy_locks) we need to
/// have failed first because of it and added it via watchZeroCopyLock. Considering we've already tried to
/// use cloud storage and zero-copy replication, the most likely scenario is that we'll try again
String replica_to_execute_merge;
if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0], replica_to_execute_merge))
if (disk_with_zero_copy && storage.checkZeroCopyLockExists(entry.new_part_name, disk_with_zero_copy, replica_to_execute_merge))
{
constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting for {} to execute it and will fetch after.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge);