diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index 4a14e1292fa..b2e6129c61c 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -489,7 +489,7 @@ size_t KafkaConsumer::filterMessageErrors() { assert(current == messages.begin()); - auto new_end = std::remove_if(messages.begin(), messages.end(), [this](auto & message) + size_t skipped = std::erase_if(messages, [this](auto & message) { if (auto error = message.get_error()) { @@ -500,12 +500,8 @@ size_t KafkaConsumer::filterMessageErrors() return false; }); - size_t skipped = std::distance(new_end, messages.end()); if (skipped) - { LOG_ERROR(log, "There were {} messages with an error", skipped); - messages.erase(new_end, messages.end()); - } return skipped; } diff --git a/src/Storages/MergeTree/MergeSelector.h b/src/Storages/MergeTree/MergeSelector.h index c55f738f879..2f17e1e9654 100644 --- a/src/Storages/MergeTree/MergeSelector.h +++ b/src/Storages/MergeTree/MergeSelector.h @@ -11,6 +11,8 @@ namespace DB { +class IMergeTreeDataPart; + /** Interface of algorithm to select data parts to merge * (merge is also known as "compaction"). * Following properties depend on it: @@ -26,6 +28,7 @@ namespace DB */ class IMergeSelector { + using DataPartPtr = std::shared_ptr; public: /// Information about data part relevant to merge selecting strategy. struct Part @@ -50,6 +53,11 @@ public: ASTPtr compression_codec_desc; bool shall_participate_in_merges = true; + + const DataPartPtr & getDataPartPtr() const + { + return *static_cast(data); + } }; /// Parts are belong to partitions. Only parts within same partition could be merged. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b21f44baeb5..440c91e3082 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5820,11 +5820,10 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ActiveDataPartSet active_parts(format_version); auto detached_parts = getDetachedParts(); - auto new_end_it = std::remove_if(detached_parts.begin(), detached_parts.end(), [&partition_id](const DetachedPartInfo & part_info) + std::erase_if(detached_parts, [&partition_id](const DetachedPartInfo & part_info) { return !part_info.valid_name || !part_info.prefix.empty() || part_info.partition_id != partition_id; }); - detached_parts.resize(std::distance(detached_parts.begin(), new_end_it)); for (const auto & part_info : detached_parts) { diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 8300abd51bd..29a1574b66e 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -136,68 +136,11 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( const AllowedMergingPredicate & can_merge_callback, bool merge_with_ttl_allowed, const MergeTreeTransactionPtr & txn, - String * out_disable_reason) + String * out_disable_reason, + const PartitionIdsHint * partitions_hint) { - MergeTreeData::DataPartsVector data_parts; - if (txn) - { - /// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction. - /// So at the first glance we could just get all active parts. - /// Active parts include uncommitted parts, but it's ok and merge predicate handles it. - /// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0. - /// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them. - /// But it's wrong, because all_2_2_0 may become active again if transaction will roll back. - /// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed. - MergeTreeData::DataPartsVector active_parts; - MergeTreeData::DataPartsVector outdated_parts; + MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn, partitions_hint); - { - auto lock = data.lockParts(); - active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock); - outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock); - } - - ActiveDataPartSet active_parts_set{data.format_version}; - for (const auto & part : active_parts) - active_parts_set.add(part->name); - - for (const auto & part : outdated_parts) - { - /// We don't need rolled back parts. - /// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first - /// and then remove part from working set, so there's no race condition - if (part->version.creation_csn == Tx::RolledBackCSN) - continue; - - /// We don't need parts that are finally removed. - /// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently. - /// But it's not a problem if we will add such part to `data_parts`. - if (part->version.removal_csn != Tx::UnknownCSN) - continue; - - active_parts_set.add(part->name); - } - - /// Restore "active" parts set from selected active and outdated parts - auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool - { - return active_parts_set.getContainingPart(part->info) != part->name; - }; - - std::erase_if(active_parts, remove_pred); - - std::erase_if(outdated_parts, remove_pred); - - std::merge(active_parts.begin(), active_parts.end(), - outdated_parts.begin(), outdated_parts.end(), - std::back_inserter(data_parts), MergeTreeData::LessDataPart()); - } - else - { - /// Simply get all active parts - data_parts = data.getDataPartsVectorForInternalUsage(); - } - const auto data_settings = data.getSettings(); auto metadata_snapshot = data.getInMemoryMetadataPtr(); if (data_parts.empty()) @@ -207,9 +150,193 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( return SelectPartsDecision::CANNOT_SELECT; } - time_t current_time = std::time(nullptr); + MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn, out_disable_reason); - IMergeSelector::PartsRanges parts_ranges; + if (info.parts_selected_precondition == 0) + { + if (out_disable_reason) + *out_disable_reason = "No parts satisfy preconditions for merge"; + return SelectPartsDecision::CANNOT_SELECT; + } + + auto res = selectPartsToMergeFromRanges(future_part, aggressive, max_total_size_to_merge, merge_with_ttl_allowed, + metadata_snapshot, info.parts_ranges, info.current_time, out_disable_reason); + + if (res == SelectPartsDecision::SELECTED) + return res; + + String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info); + if (!best_partition_id_to_optimize.empty()) + { + return selectAllPartsToMergeWithinPartition( + future_part, + can_merge_callback, + best_partition_id_to_optimize, + /*final=*/true, + metadata_snapshot, + txn, + out_disable_reason, + /*optimize_skip_merged_partitions=*/true); + } + + if (out_disable_reason) + *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; + return SelectPartsDecision::CANNOT_SELECT; +} + +MergeTreeDataMergerMutator::PartitionIdsHint MergeTreeDataMergerMutator::getPartitionsThatMayBeMerged( + size_t max_total_size_to_merge, + const AllowedMergingPredicate & can_merge_callback, + bool merge_with_ttl_allowed, + const MergeTreeTransactionPtr & txn) const +{ + PartitionIdsHint res; + MergeTreeData::DataPartsVector data_parts = getDataPartsToSelectMergeFrom(txn); + if (data_parts.empty()) + return res; + + auto metadata_snapshot = data.getInMemoryMetadataPtr(); + + MergeSelectingInfo info = getPossibleMergeRanges(data_parts, can_merge_callback, txn); + + if (info.parts_selected_precondition == 0) + return res; + + Strings all_partition_ids; + std::vector ranges_per_partition; + + String curr_partition; + for (auto & range : info.parts_ranges) + { + if (range.empty()) + continue; + const String & partition_id = range.front().getDataPartPtr()->info.partition_id; + if (partition_id != curr_partition) + { + curr_partition = partition_id; + all_partition_ids.push_back(curr_partition); + ranges_per_partition.emplace_back(); + } + ranges_per_partition.back().emplace_back(std::move(range)); + } + + for (size_t i = 0; i < all_partition_ids.size(); ++i) + { + auto future_part = std::make_shared(); + String out_disable_reason; + /// This method should have been const, but something went wrong... it's const with dry_run = true + auto status = const_cast(this)->selectPartsToMergeFromRanges( + future_part, /*aggressive*/ false, max_total_size_to_merge, merge_with_ttl_allowed, + metadata_snapshot, ranges_per_partition[i], info.current_time, &out_disable_reason, + /* dry_run */ true); + if (status == SelectPartsDecision::SELECTED) + res.insert(all_partition_ids[i]); + else + LOG_TEST(log, "Nothing to merge in partition {}: {}", all_partition_ids[i], out_disable_reason); + } + + String best_partition_id_to_optimize = getBestPartitionToOptimizeEntire(info.partitions_info); + if (!best_partition_id_to_optimize.empty()) + res.emplace(std::move(best_partition_id_to_optimize)); + + LOG_TRACE(log, "Checked {} partitions, found {} partitions with parts that may be merged: {}", + all_partition_ids.size(), res.size(), fmt::join(res, ", ")); + return res; +} + +MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom( + const MergeTreeTransactionPtr & txn, const PartitionIdsHint * partitions_hint) const +{ + auto res = getDataPartsToSelectMergeFrom(txn); + if (!partitions_hint) + return res; + + std::erase_if(res, [partitions_hint](const auto & part) + { + return !partitions_hint->contains(part->info.partition_id); + }); + return res; +} + +MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const +{ + MergeTreeData::DataPartsVector res; + if (!txn) + { + /// Simply get all active parts + return data.getDataPartsVectorForInternalUsage(); + } + + /// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction. + /// So at the first glance we could just get all active parts. + /// Active parts include uncommitted parts, but it's ok and merge predicate handles it. + /// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0. + /// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them. + /// But it's wrong, because all_2_2_0 may become active again if transaction will roll back. + /// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed. + MergeTreeData::DataPartsVector active_parts; + MergeTreeData::DataPartsVector outdated_parts; + + { + auto lock = data.lockParts(); + active_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Active}, lock); + outdated_parts = data.getDataPartsVectorForInternalUsage({MergeTreeData::DataPartState::Outdated}, lock); + } + + ActiveDataPartSet active_parts_set{data.format_version}; + for (const auto & part : active_parts) + active_parts_set.add(part->name); + + for (const auto & part : outdated_parts) + { + /// We don't need rolled back parts. + /// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first + /// and then remove part from working set, so there's no race condition + if (part->version.creation_csn == Tx::RolledBackCSN) + continue; + + /// We don't need parts that are finally removed. + /// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently. + /// But it's not a problem if we will add such part to `data_parts`. + if (part->version.removal_csn != Tx::UnknownCSN) + continue; + + active_parts_set.add(part->name); + } + + /// Restore "active" parts set from selected active and outdated parts + auto remove_pred = [&](const MergeTreeData::DataPartPtr & part) -> bool + { + return active_parts_set.getContainingPart(part->info) != part->name; + }; + + std::erase_if(active_parts, remove_pred); + + std::erase_if(outdated_parts, remove_pred); + + MergeTreeData::DataPartsVector data_parts; + std::merge( + active_parts.begin(), + active_parts.end(), + outdated_parts.begin(), + outdated_parts.end(), + std::back_inserter(data_parts), + MergeTreeData::LessDataPart()); + + return data_parts; +} + +MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPossibleMergeRanges( + const MergeTreeData::DataPartsVector & data_parts, + const AllowedMergingPredicate & can_merge_callback, + const MergeTreeTransactionPtr & txn, + String * out_disable_reason) const +{ + MergeSelectingInfo res; + + res.current_time = std::time(nullptr); + + IMergeSelector::PartsRanges & parts_ranges = res.parts_ranges; StoragePolicyPtr storage_policy = data.getStoragePolicy(); /// Volumes with stopped merges are extremely rare situation. @@ -221,14 +348,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( const MergeTreeData::DataPartPtr * prev_part = nullptr; /// collect min_age for each partition while iterating parts - struct PartitionInfo - { - time_t min_age{std::numeric_limits::max()}; - }; + PartitionsInfo & partitions_info = res.partitions_info; - std::unordered_map partitions_info; - - size_t parts_selected_precondition = 0; for (const MergeTreeData::DataPartPtr & part : data_parts) { const String & partition_id = part->info.partition_id; @@ -284,7 +405,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( IMergeSelector::Part part_info; part_info.size = part->getBytesOnDisk(); - part_info.age = current_time - part->modification_time; + part_info.age = res.current_time - part->modification_time; part_info.level = part->info.level; part_info.data = ∂ part_info.ttl_infos = &part->ttl_infos; @@ -294,7 +415,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( auto & partition_info = partitions_info[partition_id]; partition_info.min_age = std::min(partition_info.min_age, part_info.age); - ++parts_selected_precondition; + ++res.parts_selected_precondition; parts_ranges.back().emplace_back(part_info); @@ -311,13 +432,21 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( prev_part = ∂ } - if (parts_selected_precondition == 0) - { - if (out_disable_reason) - *out_disable_reason = "No parts satisfy preconditions for merge"; - return SelectPartsDecision::CANNOT_SELECT; - } + return res; +} +SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges( + FutureMergedMutatedPartPtr future_part, + bool aggressive, + size_t max_total_size_to_merge, + bool merge_with_ttl_allowed, + const StorageMetadataPtr & metadata_snapshot, + const IMergeSelector::PartsRanges & parts_ranges, + const time_t & current_time, + String * out_disable_reason, + bool dry_run) +{ + const auto data_settings = data.getSettings(); IMergeSelector::PartsRange parts_to_merge; if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled()) @@ -327,7 +456,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( next_delete_ttl_merge_times_by_partition, current_time, data_settings->merge_with_ttl_timeout, - true); + /*only_drop_parts*/ true, + dry_run); /// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space parts_to_merge = drop_ttl_selector.select(parts_ranges, data_settings->max_bytes_to_merge_at_max_space_in_pool); @@ -341,7 +471,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( next_delete_ttl_merge_times_by_partition, current_time, data_settings->merge_with_ttl_timeout, - false); + /*only_drop_parts*/ false, + dry_run); parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) @@ -354,7 +485,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( next_recompress_ttl_merge_times_by_partition, current_time, data_settings->merge_with_recompression_ttl_timeout, - metadata_snapshot->getRecompressionTTLs()); + metadata_snapshot->getRecompressionTTLs(), + dry_run); parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge); if (!parts_to_merge.empty()) @@ -382,29 +514,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( if (parts_to_merge.empty()) { - if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds) - { - auto best_partition_it = std::max_element( - partitions_info.begin(), - partitions_info.end(), - [](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; }); - - assert(best_partition_it != partitions_info.end()); - - if (static_cast(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds) - return selectAllPartsToMergeWithinPartition( - future_part, - can_merge_callback, - best_partition_it->first, - /*final=*/true, - metadata_snapshot, - txn, - out_disable_reason, - /*optimize_skip_merged_partitions=*/true); - } - if (out_disable_reason) - *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; + *out_disable_reason = "Did not find any parts to merge (with usual merge selectors)"; return SelectPartsDecision::CANNOT_SELECT; } } @@ -413,7 +524,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( parts.reserve(parts_to_merge.size()); for (IMergeSelector::Part & part_info : parts_to_merge) { - const MergeTreeData::DataPartPtr & part = *static_cast(part_info.data); + const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr(); parts.push_back(part); } @@ -422,6 +533,28 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( return SelectPartsDecision::SELECTED; } +String MergeTreeDataMergerMutator::getBestPartitionToOptimizeEntire( + const PartitionsInfo & partitions_info) const +{ + const auto data_settings = data.getSettings(); + if (!data_settings->min_age_to_force_merge_on_partition_only) + return {}; + if (!data_settings->min_age_to_force_merge_seconds) + return {}; + + auto best_partition_it = std::max_element( + partitions_info.begin(), + partitions_info.end(), + [](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; }); + + assert(best_partition_it != partitions_info.end()); + + if (static_cast(best_partition_it->second.min_age) < data_settings->min_age_to_force_merge_seconds) + return {}; + + return best_partition_it->first; +} + SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( FutureMergedMutatedPartPtr future_part, const AllowedMergingPredicate & can_merge, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 7c96fcfaeb3..428161ea71e 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -62,6 +62,59 @@ public: */ UInt64 getMaxSourcePartSizeForMutation() const; + struct PartitionInfo + { + time_t min_age{std::numeric_limits::max()}; + }; + using PartitionsInfo = std::unordered_map; + + using PartitionIdsHint = std::unordered_set; + + /// The first step of selecting parts to merge: returns a list of all active/visible parts + MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom(const MergeTreeTransactionPtr & txn) const; + + /// Same as above, but filters partitions according to partitions_hint + MergeTreeData::DataPartsVector getDataPartsToSelectMergeFrom( + const MergeTreeTransactionPtr & txn, + const PartitionIdsHint * partitions_hint) const; + + struct MergeSelectingInfo + { + time_t current_time; + PartitionsInfo partitions_info; + IMergeSelector::PartsRanges parts_ranges; + size_t parts_selected_precondition = 0; + }; + + /// The second step of selecting parts to merge: splits parts list into a set of ranges according to can_merge_callback. + /// All parts within a range can be merged without violating some invariants. + MergeSelectingInfo getPossibleMergeRanges( + const MergeTreeData::DataPartsVector & data_parts, + const AllowedMergingPredicate & can_merge_callback, + const MergeTreeTransactionPtr & txn, + String * out_disable_reason = nullptr) const; + + /// The third step of selecting parts to merge: takes ranges that we can merge, and selects parts that we want to merge + SelectPartsDecision selectPartsToMergeFromRanges( + FutureMergedMutatedPartPtr future_part, + bool aggressive, + size_t max_total_size_to_merge, + bool merge_with_ttl_allowed, + const StorageMetadataPtr & metadata_snapshot, + const IMergeSelector::PartsRanges & parts_ranges, + const time_t & current_time, + String * out_disable_reason = nullptr, + bool dry_run = false); + + String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const; + + /// Useful to quickly get a list of partitions that contain parts that we may want to merge + PartitionIdsHint getPartitionsThatMayBeMerged( + size_t max_total_size_to_merge, + const AllowedMergingPredicate & can_merge_callback, + bool merge_with_ttl_allowed, + const MergeTreeTransactionPtr & txn) const; + /** Selects which parts to merge. Uses a lot of heuristics. * * can_merge - a function that determines if it is possible to merge a pair of adjacent parts. @@ -76,7 +129,8 @@ public: const AllowedMergingPredicate & can_merge, bool merge_with_ttl_allowed, const MergeTreeTransactionPtr & txn, - String * out_disable_reason = nullptr); + String * out_disable_reason = nullptr, + const PartitionIdsHint * partitions_hint = nullptr); /** Select all the parts in the specified partition for merge, if possible. * final - choose to merge even a single part - that is, allow to merge one part "with itself", diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index 09787bd1731..3c7c9097a2d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -41,6 +41,9 @@ struct ReplicatedMergeTreeMutationEntry using BlockNumbersType = std::map; BlockNumbersType block_numbers; + /// List of partitions that do not have relevant uncommitted blocks to mutate + mutable std::unordered_set checked_partitions_cache; + /// Mutation commands which will give to MUTATE_PART entries MutationCommands commands; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index e3c9a54023c..5eddd0b5370 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -160,14 +160,13 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) Strings children = zookeeper->getChildren(queue_path); - auto to_remove_it = std::remove_if( - children.begin(), children.end(), [&](const String & path) + size_t removed_entries = std::erase_if(children, + [&](const String & path) { return already_loaded_paths.count(path); }); - LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", (to_remove_it - children.begin()), (children.end() - to_remove_it)); - children.erase(to_remove_it, children.end()); + LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", children.size(), removed_entries); ::sort(children.begin(), children.end()); @@ -808,13 +807,15 @@ QueueRepresentation getQueueRepresentation(const std::list && partition_ids_hint) { return ReplicatedMergeTreeMergePredicate(*this, zookeeper, std::move(partition_ids_hint)); } @@ -1874,7 +1876,9 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep alter_sequence.finishDataAlter(mutation.entry->alter_version, lock); if (mutation.parts_to_do.size() != 0) { - LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas."); + LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number. " + "It's OK, tasks for rest parts will be skipped, but probably a lot of mutations " + "were executed concurrently on different replicas.", znode); mutation.parts_to_do.clear(); } } @@ -1900,14 +1904,15 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep PartitionIdsHint partition_ids_hint; for (const auto & candidate : candidates) for (const auto & partitions : candidate->block_numbers) - partition_ids_hint.insert(partitions.first); + if (!candidate->checked_partitions_cache.contains(partitions.first)) + partition_ids_hint.insert(partitions.first); auto merge_pred = getMergePredicate(zookeeper, std::move(partition_ids_hint)); std::vector finished; for (const auto & candidate : candidates) { - if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers)) + if (merge_pred.isMutationFinished(candidate->znode_name, candidate->block_numbers, candidate->checked_partitions_cache)) finished.push_back(candidate.get()); } @@ -2098,9 +2103,15 @@ ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue() return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex); } -ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( - ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_) +LocalMergePredicate::LocalMergePredicate(ReplicatedMergeTreeQueue & queue_) : queue(queue_) +{ +} + +ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( + ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, std::optional && partition_ids_hint_) + : nested_pred(queue_) + , queue(queue_) , partition_ids_hint(std::move(partition_ids_hint_)) , prev_virtual_parts(queue.format_version) { @@ -2117,21 +2128,33 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( /// Dropped (or cleaned up by TTL) partitions are never removed from ZK, /// so without hint it can do a few thousands requests (if not using MultiRead). Strings partitions; - if (partition_ids_hint.empty()) + if (!partition_ids_hint) partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers"); else - std::copy(partition_ids_hint.begin(), partition_ids_hint.end(), std::back_inserter(partitions)); + std::copy(partition_ids_hint->begin(), partition_ids_hint->end(), std::back_inserter(partitions)); std::vector paths; paths.reserve(partitions.size()); for (const String & partition : partitions) paths.push_back(fs::path(queue.zookeeper_path) / "block_numbers" / partition); - auto locks_children = zookeeper->getChildren(paths); + auto locks_children = zookeeper->tryGetChildren(paths); for (size_t i = 0; i < partitions.size(); ++i) { - Strings partition_block_numbers = locks_children[i].names; + auto & response = locks_children[i]; + if (response.error != Coordination::Error::ZOK && !partition_ids_hint) + throw Coordination::Exception(response.error, paths[i]); + + if (response.error != Coordination::Error::ZOK) + { + /// Probably a wrong hint was provided (it's ok if a user passed non-existing partition to OPTIMIZE) + LOG_WARNING(queue.log, "Partition id '{}' was provided as a hint, but there's not such partition in ZooKeeper", partitions[i]); + partition_ids_hint->erase(partitions[i]); + continue; + } + + Strings partition_block_numbers = response.names; for (const String & entry : partition_block_numbers) { if (!startsWith(entry, "block-")) @@ -2166,6 +2189,18 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( inprogress_quorum_part.clear(); } +bool LocalMergePredicate::operator()( + const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + const MergeTreeTransaction *, + String * out_reason) const +{ + if (left) + return canMergeTwoParts(left, right, out_reason); + else + return canMergeSinglePart(right, out_reason); +} + bool ReplicatedMergeTreeMergePredicate::operator()( const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, @@ -2253,7 +2288,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts( if (left_max_block + 1 < right_min_block) { - if (!partition_ids_hint.empty() && !partition_ids_hint.contains(left->info.partition_id)) + if (partition_ids_hint && !partition_ids_hint->contains(left->info.partition_id)) { if (out_reason) *out_reason = fmt::format("Uncommitted block were not loaded for unexpected partition {}", left->info.partition_id); @@ -2277,6 +2312,17 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts( } } + return nested_pred.canMergeTwoParts(left, right, out_reason); +} + +bool LocalMergePredicate::canMergeTwoParts( + const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + String * out_reason) const +{ + Int64 left_max_block = left->info.max_block; + Int64 right_min_block = right->info.min_block; + std::lock_guard lock(queue.state_mutex); for (const MergeTreeData::DataPartPtr & part : {left, right}) @@ -2354,6 +2400,11 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart( return false; } + return nested_pred.canMergeSinglePart(part, out_reason); +} + +bool LocalMergePredicate::canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const +{ std::lock_guard lock(queue.state_mutex); /// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer @@ -2476,7 +2527,8 @@ std::optional> ReplicatedMergeTreeMergePredicate::getDesir } -bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map & block_numbers) const +bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & znode_name, const std::map & block_numbers, + std::unordered_set & checked_partitions_cache) const { /// Check committing block numbers, maybe some affected inserts /// still not written to disk and committed to ZK. @@ -2485,7 +2537,11 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z const String & partition_id = kv.first; Int64 block_num = kv.second; - if (!partition_ids_hint.empty() && !partition_ids_hint.contains(partition_id)) + /// Maybe we already know that there are no relevant uncommitted blocks + if (checked_partitions_cache.contains(partition_id)) + continue; + + if (partition_ids_hint && !partition_ids_hint->contains(partition_id)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition id {} was not provided as hint, it's a bug", partition_id); auto partition_it = committing_blocks.find(partition_id); @@ -2499,6 +2555,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z return false; } } + + /// There are no committing blocks less than block_num in that partition and there's no way they can appear + /// TODO Why not to get committing blocks when pulling a mutation? We could get rid of finalization task or simplify it + checked_partitions_cache.insert(partition_id); } std::lock_guard lock(queue.state_mutex); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 3fefc341bbc..79572e13963 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -32,6 +32,7 @@ class ReplicatedMergeTreeQueue { private: friend class CurrentlyExecuting; + friend class LocalMergePredicate; friend class ReplicatedMergeTreeMergePredicate; friend class MergeFromLogEntryTask; friend class ReplicatedMergeMutateTaskBase; @@ -390,7 +391,8 @@ public: size_t countUnfinishedMutations() const; /// Returns functor which used by MergeTreeMergerMutator to select parts for merge - ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint); + ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper, + std::optional && partition_ids_hint); MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const; @@ -489,10 +491,33 @@ public: void createLogEntriesToFetchBrokenParts(); }; +/// Lightweight version of ReplicatedMergeTreeMergePredicate that do not make any ZooKeeper requests, +/// but may return false-positive results. Checks only a subset of required conditions. +class LocalMergePredicate +{ +public: + LocalMergePredicate(ReplicatedMergeTreeQueue & queue_); + + bool operator()(const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + const MergeTreeTransaction * txn, + String * out_reason = nullptr) const; + + bool canMergeTwoParts(const MergeTreeData::DataPartPtr & left, + const MergeTreeData::DataPartPtr & right, + String * out_reason = nullptr) const; + + bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const; + +private: + const ReplicatedMergeTreeQueue & queue; +}; + class ReplicatedMergeTreeMergePredicate { public: - ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, PartitionIdsHint && partition_ids_hint_); + ReplicatedMergeTreeMergePredicate(ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper, + std::optional && partition_ids_hint_); /// Depending on the existence of left part checks a merge predicate for two parts or for single part. bool operator()(const MergeTreeData::DataPartPtr & left, @@ -523,7 +548,8 @@ public: /// don't glue them together. Alter is rare operation, so it shouldn't affect performance. std::optional> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; - bool isMutationFinished(const std::string & znode_name, const std::map & block_numbers) const; + bool isMutationFinished(const std::string & znode_name, const std::map & block_numbers, + std::unordered_set & checked_partitions_cache) const; /// The version of "log" node that is used to check that no new merges have appeared. int32_t getVersion() const { return merges_version; } @@ -535,9 +561,11 @@ public: String getCoveringVirtualPart(const String & part_name) const; private: + LocalMergePredicate nested_pred; + const ReplicatedMergeTreeQueue & queue; - PartitionIdsHint partition_ids_hint; + std::optional partition_ids_hint; /// A snapshot of active parts that would appear if the replica executes all log entries in its queue. ActiveDataPartSet prev_virtual_parts; diff --git a/src/Storages/MergeTree/TTLMergeSelector.cpp b/src/Storages/MergeTree/TTLMergeSelector.cpp index d5657aa680d..f4c698d76d7 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -11,7 +11,7 @@ namespace DB const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info) { - const MergeTreeData::DataPartPtr & part = *static_cast(part_info.data); + const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr(); return part->info.partition_id; } @@ -90,8 +90,11 @@ IMergeSelector::PartsRange ITTLMergeSelector::select( ++best_end; } - const auto & best_partition_id = getPartitionIdForPart(best_partition.front()); - merge_due_times[best_partition_id] = current_time + merge_cooldown_time; + if (!dry_run) + { + const auto & best_partition_id = getPartitionIdForPart(best_partition.front()); + merge_due_times[best_partition_id] = current_time + merge_cooldown_time; + } return PartsRange(best_begin, best_end); } diff --git a/src/Storages/MergeTree/TTLMergeSelector.h b/src/Storages/MergeTree/TTLMergeSelector.h index 88dc1fffee2..8c82e284a45 100644 --- a/src/Storages/MergeTree/TTLMergeSelector.h +++ b/src/Storages/MergeTree/TTLMergeSelector.h @@ -21,10 +21,11 @@ class ITTLMergeSelector : public IMergeSelector public: using PartitionIdToTTLs = std::map; - ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_) + ITTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool dry_run_) : current_time(current_time_) , merge_due_times(merge_due_times_) , merge_cooldown_time(merge_cooldown_time_) + , dry_run(dry_run_) { } @@ -46,6 +47,7 @@ protected: private: PartitionIdToTTLs & merge_due_times; Int64 merge_cooldown_time; + bool dry_run; }; @@ -56,8 +58,9 @@ class TTLDeleteMergeSelector : public ITTLMergeSelector public: using PartitionIdToTTLs = std::map; - TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_) - : ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_) + TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, + bool only_drop_parts_, bool dry_run_) + : ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_) , only_drop_parts(only_drop_parts_) {} time_t getTTLForPart(const IMergeSelector::Part & part) const override; @@ -75,8 +78,9 @@ private: class TTLRecompressMergeSelector : public ITTLMergeSelector { public: - TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, const TTLDescriptions & recompression_ttls_) - : ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_) + TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, + const TTLDescriptions & recompression_ttls_, bool dry_run_) + : ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_) , recompression_ttls(recompression_ttls_) {} diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d9c8f09ccf1..ea9ffee4939 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3243,7 +3243,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() auto zookeeper = getZooKeeperAndAssertNotReadonly(); - ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, getAllPartitionIds()); + std::optional merge_pred; /// If many merges is already queued, then will queue only small enough merges. /// Otherwise merge queue could be filled with only large merges, @@ -3280,8 +3280,22 @@ void StorageReplicatedMergeTree::mergeSelectingTask() if (storage_settings.get()->assign_part_uuids) future_merged_part->uuid = UUIDHelpers::generateV4(); - if (max_source_parts_size_for_merge > 0 && - merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr) == SelectPartsDecision::SELECTED) + bool can_assign_merge = max_source_parts_size_for_merge > 0; + PartitionIdsHint partitions_to_merge_in; + if (can_assign_merge) + { + auto lightweight_merge_pred = LocalMergePredicate(queue); + partitions_to_merge_in = merger_mutator.getPartitionsThatMayBeMerged( + max_source_parts_size_for_merge, lightweight_merge_pred, merge_with_ttl_allowed, NO_TRANSACTION_PTR); + if (partitions_to_merge_in.empty()) + can_assign_merge = false; + else + merge_pred.emplace(queue.getMergePredicate(zookeeper, partitions_to_merge_in)); + } + + if (can_assign_merge && + merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, *merge_pred, + merge_with_ttl_allowed, NO_TRANSACTION_PTR, nullptr, &partitions_to_merge_in) == SelectPartsDecision::SELECTED) { create_result = createLogEntryToMergeParts( zookeeper, @@ -3293,13 +3307,17 @@ void StorageReplicatedMergeTree::mergeSelectingTask() deduplicate_by_columns, cleanup, nullptr, - merge_pred.getVersion(), + merge_pred->getVersion(), future_merged_part->merge_type); } /// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 && merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue) { + /// We don't need the list of committing blocks to choose a part to mutate + if (!merge_pred) + merge_pred.emplace(queue.getMergePredicate(zookeeper, PartitionIdsHint{})); + /// Choose a part to mutate. DataPartsVector data_parts = getDataPartsVectorForInternalUsage(); for (const auto & part : data_parts) @@ -3307,7 +3325,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() if (part->getBytesOnDisk() > max_source_part_size_for_mutation) continue; - std::optional> desired_mutation_version = merge_pred.getDesiredMutationVersion(part); + std::optional> desired_mutation_version = merge_pred->getDesiredMutationVersion(part); if (!desired_mutation_version) continue; @@ -3316,7 +3334,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() future_merged_part->uuid, desired_mutation_version->first, desired_mutation_version->second, - merge_pred.getVersion()); + merge_pred->getVersion()); if (create_result == CreateMergeEntryResult::Ok || create_result == CreateMergeEntryResult::LogUpdated) diff --git a/tests/queries/0_stateless/02439_merge_selecting_partitions.reference b/tests/queries/0_stateless/02439_merge_selecting_partitions.reference new file mode 100644 index 00000000000..e836994b3aa --- /dev/null +++ b/tests/queries/0_stateless/02439_merge_selecting_partitions.reference @@ -0,0 +1 @@ +/test/02439/s1/default/block_numbers/123 diff --git a/tests/queries/0_stateless/02439_merge_selecting_partitions.sql b/tests/queries/0_stateless/02439_merge_selecting_partitions.sql new file mode 100644 index 00000000000..88ce2834d6b --- /dev/null +++ b/tests/queries/0_stateless/02439_merge_selecting_partitions.sql @@ -0,0 +1,28 @@ + +drop table if exists rmt; + +create table rmt (n int, m int) engine=ReplicatedMergeTree('/test/02439/{shard}/{database}', '{replica}') partition by n order by n; +insert into rmt select number, number from numbers(50); +insert into rmt values (1, 2); +insert into rmt values (1, 3); +insert into rmt values (1, 4); +insert into rmt values (1, 5); +insert into rmt values (1, 6); +insert into rmt values (1, 7); +insert into rmt values (1, 8); +insert into rmt values (1, 9); +-- there's nothing to merge in all partitions but '1' + +optimize table rmt partition tuple(123); + +set optimize_throw_if_noop=1; +optimize table rmt partition tuple(123); -- { serverError CANNOT_ASSIGN_OPTIMIZE } + +select sleepEachRow(3) as higher_probability_of_reproducing_the_issue format Null; +system flush logs; + +-- it should not list unneeded partitions where we cannot merge anything +select distinct path from system.zookeeper_log where path like '/test/02439/s1/' || currentDatabase() || '/block_numbers/%' + and op_num in ('List', 'SimpleList', 'FilteredList') and path not like '%/block_numbers/1'; + +drop table rmt;