mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #41386 from ClickHouse/fix_redundant_fetch
Fix redundant fetch in zero copy replication
This commit is contained in:
commit
560bc2bc22
@ -223,6 +223,27 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty())
|
||||
{
|
||||
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica
|
||||
/// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica.
|
||||
///
|
||||
/// It's also possible just because reads in [Zoo]Keeper are not lineariazable.
|
||||
///
|
||||
/// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important.
|
||||
zero_copy_lock->lock->unlock();
|
||||
|
||||
LOG_DEBUG(log, "We took zero copy lock, but merge of part {} finished by some other replica, will release lock and download merged part to avoid data duplication", entry.new_part_name);
|
||||
return PrepareResult{
|
||||
.prepared_successfully = false,
|
||||
.need_to_check_missing_part_in_fetch = true,
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Zero copy lock taken, will merge part {}", entry.new_part_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,6 +134,29 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty())
|
||||
{
|
||||
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica
|
||||
/// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica.
|
||||
///
|
||||
/// It's also possible just because reads in [Zoo]Keeper are not lineariazable.
|
||||
///
|
||||
/// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important.
|
||||
///
|
||||
/// In case of DROP_RANGE on fast replica and stale replica we can have some failed select queries in case of zero copy replication.
|
||||
zero_copy_lock->lock->unlock();
|
||||
|
||||
LOG_DEBUG(log, "We took zero copy lock, but mutation of part {} finished by some other replica, will release lock and download mutated part to avoid data duplication", entry.new_part_name);
|
||||
return PrepareResult{
|
||||
.prepared_successfully = false,
|
||||
.need_to_check_missing_part_in_fetch = true,
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Zero copy lock taken, will mutate part {}", entry.new_part_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
||||
#include <Storages/MergeTree/ZeroCopyLock.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -18,7 +19,12 @@ public:
|
||||
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_,
|
||||
StorageReplicatedMergeTree & storage_,
|
||||
Callback && task_result_callback_)
|
||||
: ReplicatedMergeMutateTaskBase(&Poco::Logger::get("MutateFromLogEntryTask"), storage_, selected_entry_, task_result_callback_) {}
|
||||
: ReplicatedMergeMutateTaskBase(
|
||||
&Poco::Logger::get(storage_.getStorageID().getShortName() + "::" + selected_entry_->log_entry->new_part_name + "(MutateFromLogEntryTask)"),
|
||||
storage_,
|
||||
selected_entry_,
|
||||
task_result_callback_)
|
||||
{}
|
||||
|
||||
|
||||
UInt64 getPriority() override { return priority; }
|
||||
|
Loading…
Reference in New Issue
Block a user