split selectPartsToMerge into smaller functions

This commit is contained in:
Alexander Tokmakov 2023-05-07 22:06:25 +02:00
parent df8c930be3
commit 29bbde85a6
2 changed files with 189 additions and 98 deletions

View File

@ -138,66 +138,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const MergeTreeTransactionPtr & txn,
String * out_disable_reason)
{
MergeTreeData::DataPartsVector data_parts;
if (txn)
{
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData::DataPartsVector active_parts;
MergeTreeData::DataPartsVector outdated_parts;
MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn);
{
auto lock = data.lockParts();
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
}
ActiveDataPartSet active_parts_set{data.format_version};
for (const auto & part : active_parts)
active_parts_set.add(part->name);
for (const auto & part : outdated_parts)
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if (part->version.creation_csn == Tx::RolledBackCSN)
continue;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if (part->version.removal_csn != Tx::UnknownCSN)
continue;
active_parts_set.add(part->name);
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
{
return active_parts_set.getContainingPart(part->info) != part->name;
};
std::erase_if(active_parts, remove_pred);
std::erase_if(outdated_parts, remove_pred);
std::merge(active_parts.begin(), active_parts.end(),
outdated_parts.begin(), outdated_parts.end(),
std::back_inserter(data_parts), MergeTreeData::LessDataPart());
}
else
{
/// Simply get all active parts
data_parts = data.getDataPartsVectorForInternalUsage();
}
const auto data_settings = data.getSettings();
auto metadata_snapshot = data.getInMemoryMetadataPtr();
if (data_parts.empty())
@ -207,9 +149,118 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::CANNOT_SELECT;
}
time_t current_time = std::time(nullptr);
MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_disable_reason);
IMergeSelector::PartsRanges parts_ranges;
if (info.parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed,
metadata_snapshot, info, out_disable_reason);
if (res == SelectPartsDecision::SELECTED)
return res;
String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info);
if (!best_partition_id_to_optimize.empty())
{
return selectAllPartsToMergeWithinPartition(
future_part,
can_merge_callback,
best_partition_id_to_optimize,
/*final=*/true,
metadata_snapshot,
txn,
out_disable_reason,
/*optimize_skip_merged_partitions=*/true);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT;
}
MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const
{
if (!txn)
{
/// Simply get all active parts
return data.getDataPartsVectorForInternalUsage();
}
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData::DataPartsVector active_parts;
MergeTreeData::DataPartsVector outdated_parts;
{
auto lock = data.lockParts();
active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock);
outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock);
}
ActiveDataPartSet active_parts_set{data.format_version};
for (const auto & part : active_parts)
active_parts_set.add(part->name);
for (const auto & part : outdated_parts)
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if (part->version.creation_csn == Tx::RolledBackCSN)
continue;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if (part->version.removal_csn != Tx::UnknownCSN)
continue;
active_parts_set.add(part->name);
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool
{
return active_parts_set.getContainingPart(part->info) != part->name;
};
std::erase_if(active_parts, remove_pred);
std::erase_if(outdated_parts, remove_pred);
MergeTreeData::DataPartsVector data_parts;
std::merge(
active_parts.begin(),
active_parts.end(),
outdated_parts.begin(),
outdated_parts.end(),
std::back_inserter(data_parts),
MergeTreeData::LessDataPart());
return data_parts;
}
MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPossibleMergeRanges(
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason) const
{
MergeSelectingInfo res;
res.current_time = std::time(nullptr);
IMergeSelector::PartsRanges & parts_ranges = res.parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy();
/// Volumes with stopped merges are extremely rare situation.
@ -221,14 +272,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
const MergeTreeData::DataPartPtr * prev_part = nullptr;
/// collect min_age for each partition while iterating parts
struct PartitionInfo
{
time_t min_age{std::numeric_limits<time_t>::max()};
};
PartitionsInfo & partitions_info = res.partitions_info;
std::unordered_map<std::string, PartitionInfo> partitions_info;
size_t parts_selected_precondition = 0;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
const String & partition_id = part->info.partition_id;
@ -284,7 +329,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.age = current_time - part->modification_time;
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
part_info.ttl_infos = &part->ttl_infos;
@ -294,7 +339,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
auto & partition_info = partitions_info[partition_id];
partition_info.min_age = std::min(partition_info.min_age, part_info.age);
++parts_selected_precondition;
++res.parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
@ -311,13 +356,21 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = &part;
}
if (parts_selected_precondition == 0)
{
if (out_disable_reason)
*out_disable_reason = "No parts satisfy preconditions for merge";
return SelectPartsDecision::CANNOT_SELECT;
}
return res;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part,
bool aggressive,
size_t max_total_size_to_merge,
bool merge_with_ttl_allowed,
const StorageMetadataPtr & metadata_snapshot,
const MergeSelectingInfo & info,
String * out_disable_reason)
{
const time_t & current_time = info.current_time;
const IMergeSelector::PartsRanges & parts_ranges = info.parts_ranges;
const auto data_settings = data.getSettings();
IMergeSelector::PartsRange parts_to_merge;
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
@ -382,29 +435,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (parts_to_merge.empty())
{
if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds)
{
auto best_partition_it = std::max_element(
partitions_info.begin(),
partitions_info.end(),
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
assert(best_partition_it != partitions_info.end());
if (static_cast<size_t>(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds)
return selectAllPartsToMergeWithinPartition(
future_part,
can_merge_callback,
best_partition_it->first,
/*final=*/true,
metadata_snapshot,
txn,
out_disable_reason,
/*optimize_skip_merged_partitions=*/true);
}
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
*out_disable_reason = "Did not find any parts to merge (with usual merge selectors)";
return SelectPartsDecision::CANNOT_SELECT;
}
}
@ -422,6 +454,28 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::SELECTED;
}
String MergeTreeDataMergerMutator::getBestPartitionToOptimizeEntire(
const PartitionsInfo & partitions_info) const
{
const auto data_settings = data.getSettings();
if (!data_settings->min_age_to_force_merge_on_partition_only)
return {};
if (!data_settings->min_age_to_force_merge_seconds)
return {};
auto best_partition_it = std::max_element(
partitions_info.begin(),
partitions_info.end(),
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
assert(best_partition_it != partitions_info.end());
if (static_cast<size_t>(best_partition_it->second.min_age) < data_settings->min_age_to_force_merge_seconds)
return {};
return best_partition_it->first;
}
SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
FutureMergedMutatedPartPtr future_part,
const AllowedMergingPredicate & can_merge,

View File

@ -62,6 +62,43 @@ public:
*/
UInt64 getMaxSourcePartSizeForMutation() const;
struct PartitionInfo
{
time_t min_age{std::numeric_limits<time_t>::max()};
};
using PartitionsInfo = std::unordered_map<std::string, PartitionInfo>;
/// The first step of selecting parts to merge: returns a list of all active/visible parts
MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const;
struct MergeSelectingInfo
{
time_t current_time;
PartitionsInfo partitions_info;
IMergeSelector::PartsRanges parts_ranges;
size_t parts_selected_precondition = 0;
};
/// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback.
/// All parts withing a range can be merged without violating some invariants.
MergeSelectingInfo getPossibleMergeRanges(
const MergeTreeData::DataPartsVector & data_parts,
const AllowedMergingPredicate & can_merge_callback,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr) const;
/// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge
SelectPartsDecision selectPartsToMergeFromRanges(
FutureMergedMutatedPartPtr future_part,
bool aggressive,
size_t max_total_size_to_merge,
bool merge_with_ttl_allowed,
const StorageMetadataPtr & metadata_snapshot,
const MergeSelectingInfo & info,
String * out_disable_reason = nullptr);
String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const;
/** Selects which parts to merge. Uses a lot of heuristics.
*
* can_merge - a function that determines if it is possible to merge a pair of adjacent parts.