From b098c1be36e73bb506ca0b2954ba604f778b31b3 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 15 Sep 2022 18:54:27 +0200 Subject: [PATCH 1/5] Fix data duplication in zero copy replication --- .../MergeTree/MergeFromLogEntryTask.cpp | 21 +++++++++++++++++++ .../MergeTree/MutateFromLogEntryTask.cpp | 21 +++++++++++++++++++ .../MergeTree/MutateFromLogEntryTask.h | 8 ++++++- 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index d51cd6aa07d..4559f1019bc 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -223,6 +223,27 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() .part_log_writer = {} }; } + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + { + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica. + /// + /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// + /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. + zero_copy_lock->lock->unlock(); + + LOG_DEBUG(log, "We took zero copy lock, but merge of part {} finished by some other replica, will release lock and download merged part to avoid data duplication", entry.new_part_name); + return PrepareResult{ + .prepared_successfully = false, + .need_to_check_missing_part_in_fetch = true, + .part_log_writer = {} + }; + } + else + { + LOG_DEBUG(log, "Zero copy lock taken, will merge part {}", entry.new_part_name); + } } } diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index a51eb7854ab..7b3ef98295c 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -134,6 +134,27 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() .part_log_writer = {} }; } + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + { + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica. + /// + /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// + /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. + zero_copy_lock->lock->unlock(); + + LOG_DEBUG(log, "We took zero copy lock, but mutation of part {} finished by some other replica, will release lock and download mutated part to avoid data duplication", entry.new_part_name); + return PrepareResult{ + .prepared_successfully = false, + .need_to_check_missing_part_in_fetch = true, + .part_log_writer = {} + }; + } + else + { + LOG_DEBUG(log, "Zero copy lock taken, will mutate part {}", entry.new_part_name); + } } } diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.h b/src/Storages/MergeTree/MutateFromLogEntryTask.h index a0bbaabda85..416b0c92522 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.h +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -18,7 +19,12 @@ public: ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_, StorageReplicatedMergeTree & storage_, Callback && task_result_callback_) - : ReplicatedMergeMutateTaskBase(&Poco::Logger::get("MutateFromLogEntryTask"), storage_, selected_entry_, task_result_callback_) {} + : ReplicatedMergeMutateTaskBase( + &Poco::Logger::get(storage_.getStorageID().getShortName() + "::" + selected_entry_->log_entry->new_part_name + "(MutateFromLogEntryTask)"), + storage_, + selected_entry_, + task_result_callback_) + {} UInt64 getPriority() override { return priority; } From be81d653e1386c0d83f97b95bc81ef8e19868d51 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 15 Sep 2022 18:56:16 +0200 Subject: [PATCH 2/5] Fix comment --- src/Storages/MergeTree/MergeFromLogEntryTask.cpp | 4 ++-- src/Storages/MergeTree/MutateFromLogEntryTask.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index 4559f1019bc..fafbf78aaa4 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -225,10 +225,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() } else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) { - /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica /// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica. /// - /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// It's also possible just because reads in [Zoo]Keeper are not lineariazable. /// /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. zero_copy_lock->lock->unlock(); diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 7b3ef98295c..78f9f720b81 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -136,10 +136,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() } else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) { - /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica + /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica /// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica. /// - /// It's also posible just because reads in [Zoo]Keeper are not lineariazable. + /// It's also possible just because reads in [Zoo]Keeper are not lineariazable. /// /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. zero_copy_lock->lock->unlock(); From af259a2a68a2364b2148e9931d8345d894bb9703 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 16 Sep 2022 11:50:07 +0200 Subject: [PATCH 3/5] Update src/Storages/MergeTree/MergeFromLogEntryTask.cpp Co-authored-by: Alexander Tokmakov --- src/Storages/MergeTree/MergeFromLogEntryTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index fafbf78aaa4..9bae4a840bb 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -223,7 +223,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() .part_log_writer = {} }; } - else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty()) { /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica /// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica. From db30a3041e1cf01e35b8796096b7996b499cbd2c Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 17 Sep 2022 19:10:23 +0200 Subject: [PATCH 4/5] Update src/Storages/MergeTree/MutateFromLogEntryTask.cpp Co-authored-by: Alexander Tokmakov --- src/Storages/MergeTree/MutateFromLogEntryTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 78f9f720b81..27176eec99f 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -134,7 +134,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() .part_log_writer = {} }; } - else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty()) + else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty()) { /// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica /// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica. From dcb15869771f1b25654fc2f6e6a5648b65cef199 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 17 Sep 2022 19:11:58 +0200 Subject: [PATCH 5/5] Add comment --- src/Storages/MergeTree/MutateFromLogEntryTask.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 27176eec99f..fc8b22865c4 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -142,6 +142,8 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() /// It's also possible just because reads in [Zoo]Keeper are not lineariazable. /// /// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important. + /// + /// In case of DROP_RANGE on fast replica and stale replica we can have some failed select queries in case of zero copy replication. zero_copy_lock->lock->unlock(); LOG_DEBUG(log, "We took zero copy lock, but mutation of part {} finished by some other replica, will release lock and download mutated part to avoid data duplication", entry.new_part_name);