diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 8300abd51bd..e96e683443c 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -138,66 +138,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( const MergeTreeTransactionPtr & txn, String * out_disable_reason) { - MergeTreeData::DataPartsVector data_parts; - if (txn) - { - /// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction. - /// So at the first glance we could just get all active parts. - /// Active parts include uncommitted parts, but it's ok and merge predicate handles it. - /// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0. - /// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them. - /// But it's wrong, because all_2_2_0 may become active again if transaction will roll back. - /// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed. - MergeTreeData::DataPartsVector active_parts; - MergeTreeData::DataPartsVector outdated_parts; + MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn); - { - auto lock = data.lockParts(); - active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock); - outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock); - } - - ActiveDataPartSet active_parts_set{data.format_version}; - for (const auto & part : active_parts) - active_parts_set.add(part->name); - - for (const auto & part : outdated_parts) - { - /// We don't need rolled back parts. - /// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first - /// and then remove part from working set, so there's no race condition - if (part->version.creation_csn == Tx::RolledBackCSN) - continue; - - /// We don't need parts that are finally removed. - /// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently. - /// But it's not a problem if we will add such part to `data_parts`. - if (part->version.removal_csn != Tx::UnknownCSN) - continue; - - active_parts_set.add(part->name); - } - - /// Restore "active" parts set from selected active and outdated parts - auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool - { - return active_parts_set.getContainingPart(part->info) != part->name; - }; - - std::erase_if(active_parts, remove_pred); - - std::erase_if(outdated_parts, remove_pred); - - std::merge(active_parts.begin(), active_parts.end(), - outdated_parts.begin(), outdated_parts.end(), - std::back_inserter(data_parts), MergeTreeData::LessDataPart()); - } - else - { - /// Simply get all active parts - data_parts = data.getDataPartsVectorForInternalUsage(); - } - const auto data_settings = data.getSettings(); auto metadata_snapshot = data.getInMemoryMetadataPtr(); if (data_parts.empty()) @@ -207,9 +149,118 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( return SelectPartsDecision::CANNOT_SELECT; } - time_t current_time = std::time(nullptr); + MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_disable_reason); - IMergeSelector::PartsRanges parts_ranges; + if (info.parts_selected_precondition == 0) + { + if (out_disable_reason) + *out_disable_reason = "No parts satisfy preconditions for merge"; + return SelectPartsDecision::CANNOT_SELECT; + } + + auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed, + metadata_snapshot, info, out_disable_reason); + + if (res == SelectPartsDecision::SELECTED) + return res; + + String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info); + if (!best_partition_id_to_optimize.empty()) + { + return selectAllPartsToMergeWithinPartition( + future_part, + can_merge_callback, + best_partition_id_to_optimize, + /*final=*/true, + metadata_snapshot, + txn, + out_disable_reason, + /*optimize_skip_merged_partitions=*/true); + } + + if (out_disable_reason) + *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; + return SelectPartsDecision::CANNOT_SELECT; +} + +MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const +{ + if (!txn) + { + /// Simply get all active parts + return data.getDataPartsVectorForInternalUsage(); + } + + /// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction. + /// So at the first glance we could just get all active parts. + /// Active parts include uncommitted parts, but it's ok and merge predicate handles it. + /// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0. + /// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them. + /// But it's wrong, because all_2_2_0 may become active again if transaction will roll back. + /// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed. + MergeTreeData::DataPartsVector active_parts; + MergeTreeData::DataPartsVector outdated_parts; + + { + auto lock = data.lockParts(); + active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock); + outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock); + } + + ActiveDataPartSet active_parts_set{data.format_version}; + for (const auto & part : active_parts) + active_parts_set.add(part->name); + + for (const auto & part : outdated_parts) + { + /// We don't need rolled back parts. + /// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first + /// and then remove part from working set, so there's no race condition + if (part->version.creation_csn == Tx::RolledBackCSN) + continue; + + /// We don't need parts that are finally removed. + /// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently. + /// But it's not a problem if we will add such part to `data_parts`. + if (part->version.removal_csn != Tx::UnknownCSN) + continue; + + active_parts_set.add(part->name); + } + + /// Restore "active" parts set from selected active and outdated parts + auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool + { + return active_parts_set.getContainingPart(part->info) != part->name; + }; + + std::erase_if(active_parts, remove_pred); + + std::erase_if(outdated_parts, remove_pred); + + MergeTreeData::DataPartsVector data_parts; + std::merge( + active_parts.begin(), + active_parts.end(), + outdated_parts.begin(), + outdated_parts.end(), + std::back_inserter(data_parts), + MergeTreeData::LessDataPart()); + + return data_parts; +} + +MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPossibleMergeRanges( + const MergeTreeData::DataPartsVector & data_parts, + const AllowedMergingPredicate & can_merge_callback, + const MergeTreeTransactionPtr & txn, + String * out_disable_reason) const +{ + MergeSelectingInfo res; + + res.current_time = std::time(nullptr); + + IMergeSelector::PartsRanges & parts_ranges = res.parts_ranges; StoragePolicyPtr storage_policy = data.getStoragePolicy(); /// Volumes with stopped merges are extremely rare situation. @@ -221,14 +272,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( const MergeTreeData::DataPartPtr * prev_part = nullptr; /// collect min_age for each partition while iterating parts - struct PartitionInfo - { - time_t min_age{std::numeric_limits::max()}; - }; + PartitionsInfo & partitions_info = res.partitions_info; - std::unordered_map partitions_info; - - size_t parts_selected_precondition = 0; for (const MergeTreeData::DataPartPtr & part : data_parts) { const String & partition_id = part->info.partition_id; @@ -284,7 +329,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( IMergeSelector::Part part_info; part_info.size = part->getBytesOnDisk(); - part_info.age = current_time - part->modification_time; + part_info.age = res.current_time - part->modification_time; part_info.level = part->info.level; part_info.data = ∂ part_info.ttl_infos = &part->ttl_infos; @@ -294,7 +339,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( auto & partition_info = partitions_info[partition_id]; partition_info.min_age = std::min(partition_info.min_age, part_info.age); - ++parts_selected_precondition; + ++res.parts_selected_precondition; parts_ranges.back().emplace_back(part_info); @@ -311,13 +356,21 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( prev_part = ∂ } - if (parts_selected_precondition == 0) - { - if (out_disable_reason) - *out_disable_reason = "No parts satisfy preconditions for merge"; - return SelectPartsDecision::CANNOT_SELECT; - } + return res; +} +SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( + FutureMergedMutatedPartPtr future_part, + bool aggressive, + size_t max_total_size_to_merge, + bool merge_with_ttl_allowed, + const StorageMetadataPtr & metadata_snapshot, + const MergeSelectingInfo & info, + String * out_disable_reason) +{ + const time_t & current_time = info.current_time; + const IMergeSelector::PartsRanges & parts_ranges = info.parts_ranges; + const auto data_settings = data.getSettings(); IMergeSelector::PartsRange parts_to_merge; if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled()) @@ -382,29 +435,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( if (parts_to_merge.empty()) { - if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds) - { - auto best_partition_it = std::max_element( - partitions_info.begin(), - partitions_info.end(), - [](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; }); - - assert(best_partition_it != partitions_info.end()); - - if (static_cast(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds) - return selectAllPartsToMergeWithinPartition( - future_part, - can_merge_callback, - best_partition_it->first, - /*final=*/true, - metadata_snapshot, - txn, - out_disable_reason, - /*optimize_skip_merged_partitions=*/true); - } - if (out_disable_reason) - *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; + *out_disable_reason = "Did not find any parts to merge (with usual merge selectors)"; return SelectPartsDecision::CANNOT_SELECT; } } @@ -422,6 +454,28 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( return SelectPartsDecision::SELECTED; } +String MergeTreeDataMergerMutator::getBestPartitionToOptimizeEntire( + const PartitionsInfo & partitions_info) const +{ + const auto data_settings = data.getSettings(); + if (!data_settings->min_age_to_force_merge_on_partition_only) + return {}; + if (!data_settings->min_age_to_force_merge_seconds) + return {}; + + auto best_partition_it = std::max_element( + partitions_info.begin(), + partitions_info.end(), + [](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; }); + + assert(best_partition_it != partitions_info.end()); + + if (static_cast(best_partition_it->second.min_age) < data_settings->min_age_to_force_merge_seconds) + return {}; + + return best_partition_it->first; +} + SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( FutureMergedMutatedPartPtr future_part, const AllowedMergingPredicate & can_merge, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 7c96fcfaeb3..6935f3452b8 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -62,6 +62,43 @@ public: */ UInt64 getMaxSourcePartSizeForMutation() const; + struct PartitionInfo + { + time_t min_age{std::numeric_limits::max()}; + }; + using PartitionsInfo = std::unordered_map; + + /// The first step of selecting parts to merge: returns a list of all active/visible parts + MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const; + + struct MergeSelectingInfo + { + time_t current_time; + PartitionsInfo partitions_info; + IMergeSelector::PartsRanges parts_ranges; + size_t parts_selected_precondition = 0; + }; + + /// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback. + /// All parts withing a range can be merged without violating some invariants. + MergeSelectingInfo getPossibleMergeRanges( + const MergeTreeData::DataPartsVector & data_parts, + const AllowedMergingPredicate & can_merge_callback, + const MergeTreeTransactionPtr & txn, + String * out_disable_reason = nullptr) const; + + /// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge + SelectPartsDecision selectPartsToMergeFromRanges( + FutureMergedMutatedPartPtr future_part, + bool aggressive, + size_t max_total_size_to_merge, + bool merge_with_ttl_allowed, + const StorageMetadataPtr & metadata_snapshot, + const MergeSelectingInfo & info, + String * out_disable_reason = nullptr); + + String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const; + /** Selects which parts to merge. Uses a lot of heuristics. * * can_merge - a function that determines if it is possible to merge a pair of adjacent parts.