fix intersecting parts in parts_to_do 2

This commit is contained in:
Alexander Tokmakov 2021-11-03 21:13:42 +03:00
parent 4c675f0151
commit d4677dc6d8
3 changed files with 20 additions and 19 deletions

View File

@ -53,7 +53,7 @@ void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & ent
bool DropPartsRanges::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
{
for (const auto & [znode_name, drop_range] : drop_ranges)
for (const auto & [_, drop_range] : drop_ranges)
{
if (drop_range.contains(new_drop_range_info))
return true;

View File

@ -218,8 +218,14 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type != LogEntry::DROP_RANGE)
addPartToMutations(virtual_part_name);
if (entry->type == LogEntry::DROP_RANGE)
continue;
auto part_info = MergeTreePartInfo::fromPartName(virtual_part_name, format_version);
if (entry->type == LogEntry::REPLACE_RANGE && part_info.isFakeDropRangePart())
continue;
addPartToMutations(virtual_part_name, part_info);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
@ -435,19 +441,10 @@ void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & pa
storage.mutations_finalizing_task->schedule();
}
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name, const MergeTreePartInfo & part_info)
{
LOG_TEST(log, "Adding part {} to mutations", part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
/// Do not add special virtual parts to parts_to_do
if (part_info.isFakeDropRangePart())
{
LOG_TEST(log, "Part {} is fake drop range part, will not add it to mutations", part_name);
return;
}
assert(!part_info.isFakeDropRangePart());
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
@ -720,7 +717,7 @@ namespace
{
Names getPartNamesToMutate(
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts)
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts, const DropPartsRanges & drop_ranges)
{
Names result;
for (const auto & pair : mutation.block_numbers)
@ -736,7 +733,11 @@ Names getPartNamesToMutate(
{
auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion());
if (part_info.getDataVersion() < block_num)
result.push_back(covered_part_name);
{
/// We don't need to mutate part if it's covered by DROP_RANGE
if (!drop_ranges.hasDropRange(part_info))
result.push_back(covered_part_name);
}
}
}
@ -842,7 +843,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
/// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`.
Strings current_parts_to_mutate = getPartNamesToMutate(*entry, current_parts);
Strings current_parts_to_mutate = getPartNamesToMutate(*entry, current_parts, drop_ranges);
for (const String & current_part_to_mutate : current_parts_to_mutate)
{
assert(MergeTreePartInfo::fromPartName(current_part_to_mutate, format_version).level < MergeTreePartInfo::MAX_LEVEL);
@ -2235,7 +2236,7 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
{
std::lock_guard lock(queue.state_mutex);
size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size();
size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts, queue.drop_ranges).size();
if (suddenly_appeared_parts)
{
LOG_TRACE(queue.log, "Mutation {} is not done yet because {} parts to mutate suddenly appeared.", mutation.znode_name, suddenly_appeared_parts);

View File

@ -223,7 +223,7 @@ private:
std::unique_lock<std::mutex> & state_lock);
/// Add part for mutations with block_number > part.getDataVersion()
void addPartToMutations(const String & part_name);
void addPartToMutations(const String & part_name, const MergeTreePartInfo & part_info);
/// Remove covered parts from mutations (parts_to_do) which were assigned
/// for mutation. If remove_covered_parts = true, than remove parts covered