Fix parts count in mutations

This commit is contained in:
alesapin 2020-06-16 13:34:59 +03:00
parent bd17aa8a0e
commit d5b76b2fe1
4 changed files with 35 additions and 17 deletions

View File

@ -67,6 +67,17 @@ public:
return result;
}
/// Remove only covered parts from active set
bool removePartsCoveredBy(const String & part_name)
{
Strings parts_covered_by = getPartsCoveredBy(MergeTreePartInfo::fromPartName(part_name, format_version));
bool result = true;
for (const auto & part : parts_covered_by)
result &= remove(part);
return result;
}
/// If not found, return an empty string.
String getContainingPart(const MergeTreePartInfo & part_info) const;
String getContainingPart(const String & name) const;

View File

@ -208,21 +208,18 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
///
/// But we remove covered parts from mutations, because we actually
/// have replacing part.
Strings covered_parts = current_parts.getPartsCoveredBy(MergeTreePartInfo::fromPartName(entry->actual_new_part_name, format_version));
for (const auto & covered_part : covered_parts)
removePartFromMutations(covered_part);
///
/// NOTE actual_new_part_name is very confusing and error-prone. This approach must be fixed.
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false);
}
for (const String & virtual_part_name : entry->getVirtualPartNames())
{
Strings replaced_parts;
current_parts.add(virtual_part_name, &replaced_parts);
current_parts.add(virtual_part_name);
/// These parts are already covered by newer part, we don't have to
/// mutate it.
for (const auto & replaced_part : replaced_parts)
removePartFromMutations(replaced_part);
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false);
}
String drop_range_part_name;
@ -250,13 +247,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
/// Because execution of the entry is unsuccessful,
/// `virtual_part_name` will never appear so we won't need to mutate
/// it.
removePartFromMutations(virtual_part_name);
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ true);
}
}
}
void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & part_name, bool remove_part)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
@ -270,7 +267,11 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
MutationStatus & status = *it->second;
status.parts_to_do.removePartAndCoveredParts(part_name);
if (remove_part)
status.parts_to_do.removePartAndCoveredParts(part_name);
else
status.parts_to_do.removePartsCoveredBy(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
@ -1969,6 +1970,6 @@ void ReplicatedMergeTreeQueue::removeCurrentPartsFromMutations()
{
std::lock_guard state_lock(state_mutex);
for (const auto & part_name : current_parts.getParts())
removePartFromMutations(part_name);
removeCoveredPartsFromMutations(part_name, /*remove_part = */ true);
}
}

View File

@ -209,11 +209,14 @@ private:
/// Add part for mutations with block_number > part.getDataVersion()
void addPartToMutations(const String & part_name);
/// Remove part from mutations (parts_to_do) which were assigned to mutate it
/// with block_number > part.getDataVersion()
/// and block_number == part.getDataVersion()
/// ^ (this may happen if we downloaded mutated part from other replica)
void removePartFromMutations(const String & part_name);
/// Remove covered parts from mutations (parts_to_do) which were assigned
/// for mutation. If remove_part == true, than also remove part itself.
///
/// Part removed from mutations which satisfy contitions:
/// block_number > part.getDataVersion()
/// or block_number == part.getDataVersion()
/// ^ (this may happen if we downloaded mutated part from other replica)
void removeCoveredPartsFromMutations(const String & part_name, bool remove_part);
/// Update the insertion times in ZooKeeper.
void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper,

View File

@ -110,6 +110,9 @@ while [[ $($CLICKHOUSE_CLIENT --query "select * from system.mutations where tabl
break
fi
sleep 1
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_mutate_mt_$i" 2> /dev/null
done
counter=$(($counter + 1))
done