mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
setActualPartName for executeReplaceRange
This commit is contained in:
parent
5a6871ae29
commit
d277bfb225
@ -208,8 +208,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
{
|
||||
if (auto disk = reserved_space->getDisk(); disk->supportZeroCopyReplication())
|
||||
{
|
||||
String dummy;
|
||||
if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty())
|
||||
if (storage.findReplicaHavingCoveringPart(entry.new_part_name, true))
|
||||
{
|
||||
LOG_DEBUG(log, "Merge of part {} finished by some other replica, will fetch merged part", entry.new_part_name);
|
||||
/// We found covering part, no checks for missing part.
|
||||
@ -259,7 +258,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty())
|
||||
else if (storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false))
|
||||
{
|
||||
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica
|
||||
/// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica.
|
||||
|
@ -118,8 +118,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
|
||||
{
|
||||
if (auto disk = reserved_space->getDisk(); disk->supportZeroCopyReplication())
|
||||
{
|
||||
String dummy;
|
||||
if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty())
|
||||
if (storage.findReplicaHavingCoveringPart(entry.new_part_name, true))
|
||||
{
|
||||
LOG_DEBUG(log, "Mutation of part {} finished by some other replica, will download mutated part", entry.new_part_name);
|
||||
return PrepareResult{
|
||||
@ -167,7 +166,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
|
||||
.part_log_writer = {}
|
||||
};
|
||||
}
|
||||
else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty())
|
||||
else if (storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false))
|
||||
{
|
||||
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica
|
||||
/// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica.
|
||||
|
@ -2236,11 +2236,12 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
||||
{
|
||||
LOG_TRACE(log, "Executing DROP_RANGE {}", entry.new_part_name);
|
||||
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
||||
|
||||
/// Wait for loading of outdated parts because DROP_RANGE
|
||||
/// command must be applied to all parts on disk.
|
||||
waitForOutdatedPartsToBeLoaded();
|
||||
waitForOutdatedPartsToBeLoadedIfNotDisjoint(drop_range_info);
|
||||
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
||||
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block);
|
||||
{
|
||||
auto pause_checking_parts = part_check_thread.pausePartsCheck();
|
||||
@ -2303,7 +2304,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
||||
}
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
|
||||
{
|
||||
Stopwatch watch;
|
||||
ProfileEventsScope profile_events_scope;
|
||||
@ -2313,14 +2314,15 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
entry.znode_name, entry_replace.drop_range_part_name, entry_replace.new_part_names.size(),
|
||||
entry_replace.from_database, entry_replace.from_table);
|
||||
|
||||
MergeTreePartInfo drop_range = MergeTreePartInfo::fromPartName(entry_replace.drop_range_part_name, format_version);
|
||||
|
||||
/// Wait for loading of outdated parts because REPLACE_RANGE
|
||||
/// command must be applied to all parts on disk.
|
||||
waitForOutdatedPartsToBeLoaded();
|
||||
waitForOutdatedPartsToBeLoadedIfNotDisjoint(drop_range);
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto storage_settings_ptr = getSettings();
|
||||
|
||||
MergeTreePartInfo drop_range = MergeTreePartInfo::fromPartName(entry_replace.drop_range_part_name, format_version);
|
||||
/// Range with only one block has special meaning: it's ATTACH PARTITION or MOVE PARTITION, so there is no drop range
|
||||
bool replace = !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range);
|
||||
|
||||
@ -2529,7 +2531,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
LOG_DEBUG(log, "Part {} is not found on remote replicas", part_desc->new_part_name);
|
||||
|
||||
/// Fallback to covering part
|
||||
replica = findReplicaHavingCoveringPart(part_desc->new_part_name, true, found_part_name);
|
||||
replica = findReplicaHavingCoveringPartImplLowLevel(&entry, part_desc->new_part_name, found_part_name, true);
|
||||
|
||||
if (replica.empty())
|
||||
{
|
||||
@ -2537,6 +2539,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
LOG_DEBUG(log, "Parts covering {} are not found on remote replicas", part_desc->new_part_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Found part {} covering {} on replica {}", found_part_name, part_desc->new_part_name, replica);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -4322,6 +4326,12 @@ std::set<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(c
|
||||
}
|
||||
|
||||
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entry, bool active)
|
||||
{
|
||||
String dummy;
|
||||
return findReplicaHavingCoveringPartImplLowLevel(&entry, entry.new_part_name, dummy, active);
|
||||
}
|
||||
|
||||
String StorageReplicatedMergeTree::findReplicaHavingCoveringPartImplLowLevel(LogEntry * entry, const String & part_name, String & found_part_name, bool active)
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
Strings replicas = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas");
|
||||
@ -4341,8 +4351,8 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
|
||||
Strings parts = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas" / replica / "parts");
|
||||
for (const String & part_on_replica : parts)
|
||||
{
|
||||
if (part_on_replica == entry.new_part_name
|
||||
|| MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, format_version))
|
||||
if (part_on_replica == part_name
|
||||
|| MergeTreePartInfo::contains(part_on_replica, part_name, format_version))
|
||||
{
|
||||
if (largest_part_found.empty()
|
||||
|| MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version))
|
||||
@ -4354,15 +4364,16 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
|
||||
|
||||
if (!largest_part_found.empty())
|
||||
{
|
||||
bool the_same_part = largest_part_found == entry.new_part_name;
|
||||
found_part_name = largest_part_found;
|
||||
bool the_same_part = largest_part_found == part_name;
|
||||
|
||||
/// Make a check in case if selected part differs from source part
|
||||
if (!the_same_part)
|
||||
if (!the_same_part && entry)
|
||||
{
|
||||
String reject_reason;
|
||||
if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason))
|
||||
if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, *entry, reject_reason))
|
||||
{
|
||||
LOG_INFO(log, "Will not fetch part {} covering {}. {}", largest_part_found, entry.new_part_name, reject_reason);
|
||||
LOG_INFO(log, "Will not fetch part {} covering {}. {}", largest_part_found, part_name, reject_reason);
|
||||
return {};
|
||||
}
|
||||
}
|
||||
@ -4375,44 +4386,11 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
|
||||
}
|
||||
|
||||
|
||||
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
|
||||
const String & part_name, bool active, String & found_part_name)
|
||||
bool StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
|
||||
const String & part_name, bool active)
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
Strings replicas = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas");
|
||||
|
||||
/// Select replicas in uniformly random order.
|
||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||
|
||||
String largest_part_found;
|
||||
String largest_replica_found;
|
||||
|
||||
for (const String & replica : replicas)
|
||||
{
|
||||
if (replica == replica_name)
|
||||
continue;
|
||||
|
||||
if (active && !zookeeper->exists(fs::path(zookeeper_path) / "replicas" / replica / "is_active"))
|
||||
continue;
|
||||
|
||||
Strings parts = zookeeper->getChildren(fs::path(zookeeper_path) / "replicas" / replica / "parts");
|
||||
for (const String & part_on_replica : parts)
|
||||
{
|
||||
if (part_on_replica == part_name
|
||||
|| MergeTreePartInfo::contains(part_on_replica, part_name, format_version))
|
||||
{
|
||||
if (largest_part_found.empty()
|
||||
|| MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version))
|
||||
{
|
||||
largest_part_found = part_on_replica;
|
||||
largest_replica_found = replica;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
found_part_name = largest_part_found;
|
||||
return largest_replica_found;
|
||||
String dummy;
|
||||
return !findReplicaHavingCoveringPartImplLowLevel(/* entry */ nullptr, part_name, dummy, active).empty();
|
||||
}
|
||||
|
||||
|
||||
|
@ -698,7 +698,7 @@ private:
|
||||
/// If fetch was not successful, clears entry.actual_new_part_name.
|
||||
bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);
|
||||
|
||||
bool executeReplaceRange(const LogEntry & entry);
|
||||
bool executeReplaceRange(LogEntry & entry);
|
||||
void executeClonePartFromShard(const LogEntry & entry);
|
||||
|
||||
/** Updates the queue.
|
||||
@ -783,7 +783,8 @@ private:
|
||||
* If not found, returns empty string.
|
||||
*/
|
||||
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
|
||||
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
|
||||
bool findReplicaHavingCoveringPart(const String & part_name, bool active);
|
||||
String findReplicaHavingCoveringPartImplLowLevel(LogEntry * entry, const String & part_name, String & found_part_name, bool active);
|
||||
static std::set<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);
|
||||
|
||||
/** Download the specified part from the specified replica.
|
||||
|
Loading…
Reference in New Issue
Block a user