better partitions hint in mutations finalization

This commit is contained in:
Alexander Tokmakov 2023-05-09 00:21:07 +02:00
parent 91489243c9
commit aa829c4ddc
4 changed files with 30 additions and 12 deletions

View File

@ -87,7 +87,7 @@ public:
};
/// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback.
/// All parts withing a range can be merged without violating some invariants.
/// All parts within a range can be merged without violating some invariants.
MergeSelectingInfo getPossibleMergeRanges(
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,

View File

@ -41,6 +41,9 @@ struct ReplicatedMergeTreeMutationEntry
using BlockNumbersType = std::map<String, Int64>;
BlockNumbersType block_numbers;
/// List of partitions that do not have relevant uncommitted blocks to mutate
mutable std::unordered_set<String> checked_partitions_cache;
/// Mutation commands which will give to MUTATE_PART entries
MutationCommands commands;

View File

@ -808,13 +808,15 @@ QueueRepresentation getQueueRepresentation(const std::list<ReplicatedMergeTreeLo
///
/// From the first glance it can sound that these two sets should be enough to understand which parts we have to mutate
/// to finish mutation but it's not true:
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished). We also have to account parts which we will
/// get after queue execution.
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue. So if we need to execute mutation 0000000001 for part all_0_0_0
/// and we have already pulled entry to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
/// 1) Obviously we cannot rely on current_parts because we can have stale state (some parts are absent, some merges not finished).
/// We also have to account parts which we will get after queue execution.
/// 2) But we cannot rely on virtual_parts for this, because they contain parts which we will get after we have executed our queue.
/// So if we need to execute mutation 0000000001 for part all_0_0_0 and we have already pulled entry
/// to mutate this part into own queue our virtual parts will contain part all_0_0_0_1, not part all_0_0_0.
///
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts if they could be affected by mutation. Such approach is expensive
/// but we do it only once since we get the mutation. After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
/// To avoid such issues we simply traverse all entries in queue in order and applying diff (add parts/remove parts) to current parts
/// if they could be affected by mutation. Such approach is expensive but we do it only once since we get the mutation.
/// After that we just update parts_to_do for each mutation when pulling entries into our queue (addPartToMutations, removePartFromMutations).
ActiveDataPartSet getPartNamesToMutate(
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & current_parts,
const QueueRepresentation & queue_representation, MergeTreeDataFormatVersion format_version)
@ -1871,7 +1873,9 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
if (mutation.parts_to_do.size() != 0)
{
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number. "
"It's OK, tasks for rest parts will be skipped, but probably a lot of mutations "
"were executed concurrently on different replicas.", znode);
mutation.parts_to_do.clear();
}
}
@ -1897,14 +1901,15 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
PartitionIdsHint partition_ids_hint;
for (const auto & candidate : candidates)
for (const auto & partitions : candidate->block_numbers)
partition_ids_hint.insert(partitions.first);
if (!candidate->checked_partitions_cache.contains(partitions.first))
partition_ids_hint.insert(partitions.first);
auto merge_pred = getMergePredicate(zookeeper, std::move(partition_ids_hint));
std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
for (const auto & candidate : candidates)
{
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers))
if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers, candidate->checked_partitions_cache))
finished.push_back(candidate.get());
}
@ -2507,7 +2512,8 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
}
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const
{
/// Check committing block numbers, maybe some affected inserts
/// still not written to disk and committed to ZK.
@ -2516,6 +2522,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
const String & partition_id = kv.first;
Int64 block_num = kv.second;
/// Maybe we already know that there are no relevant uncommitted blocks
if (checked_partitions_cache.contains(partition_id))
continue;
if (partition_ids_hint && !partition_ids_hint->contains(partition_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id);
@ -2530,6 +2540,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
return false;
}
}
/// There are no committing blocks less than block_num in that partition and there's no way they can appear
/// TODO Why not to get committing blocks when pulling a mutation? We could get rid of finalization task or simplify it
checked_partitions_cache.insert(partition_id);
}
std::lock_guard lock(queue.state_mutex);

View File

@ -547,7 +547,8 @@ public:
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers) const;
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const;
/// The version of "log" node that is used to check that no new merges have appeared.
int32_t getVersion() const { return merges_version; }