diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index d51cd6aa07d..4559f1019bc 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -223,6 +223,27 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() .part_log_writer = {} }; } + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + { + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica. + /// + /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// + /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. + zero_copy_lock->lock->unlock(); + + LOG_DEBUG(log, "We took zero copy lock, but merge of part {} finished by some other replica, will release lock and download merged part to avoid data duplication", entry.new_part_name); + return PrepareResult{ + .prepared_successfully = false, + .need_to_check_missing_part_in_fetch = true, + .part_log_writer = {} + }; + } + else + { + LOG_DEBUG(log, "Zero copy lock taken, will merge part {}", entry.new_part_name); + } } } diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index a51eb7854ab..7b3ef98295c 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -134,6 +134,27 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() .part_log_writer = {} }; } + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + { + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica. + /// + /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// + /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. + zero_copy_lock->lock->unlock(); + + LOG_DEBUG(log, "We took zero copy lock, but mutation of part {} finished by some other replica, will release lock and download mutated part to avoid data duplication", entry.new_part_name); + return PrepareResult{ + .prepared_successfully = false, + .need_to_check_missing_part_in_fetch = true, + .part_log_writer = {} + }; + } + else + { + LOG_DEBUG(log, "Zero copy lock taken, will mutate part {}", entry.new_part_name); + } } } diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.h b/src/Storages/MergeTree/MutateFromLogEntryTask.h index a0bbaabda85..416b0c92522 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.h +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -18,7 +19,12 @@ public: ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_, StorageReplicatedMergeTree & storage_, Callback && task_result_callback_) - : ReplicatedMergeMutateTaskBase(&Poco::Logger::get("MutateFromLogEntryTask"), storage_, selected_entry_, task_result_callback_) {} + : ReplicatedMergeMutateTaskBase( + &Poco::Logger::get(storage_.getStorageID().getShortName() + "::" + selected_entry_->log_entry->new_part_name + "(MutateFromLogEntryTask)"), + storage_, + selected_entry_, + task_result_callback_) + {} UInt64 getPriority() override { return priority; }