Merge pull request #42869 from zhongyuankai/optimze_ttl_merge

This commit is contained in:
Vladimir C 2022-11-09 11:26:46 +01:00 committed by GitHub
commit 82bf992099
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 38 additions and 11 deletions

View File

@ -31,6 +31,8 @@ PartLogElement::MergeReasonType PartLogElement::getMergeReasonType(MergeType mer
return TTL_DELETE_MERGE;
case MergeType::TTLRecompress:
return TTL_RECOMPRESS_MERGE;
case MergeType::TTLDrop:
return TTL_DROP_MERGE;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast<UInt64>(merge_type));
@ -72,6 +74,7 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
{"RegularMerge", static_cast<Int8>(REGULAR_MERGE)},
{"TTLDeleteMerge", static_cast<Int8>(TTL_DELETE_MERGE)},
{"TTLRecompressMerge", static_cast<Int8>(TTL_RECOMPRESS_MERGE)},
{"TTLDropMerge", static_cast<Int8>(TTL_DROP_MERGE)},
}
);

View File

@ -41,6 +41,8 @@ struct PartLogElement
TTL_DELETE_MERGE = 3,
/// Merge with recompression
TTL_RECOMPRESS_MERGE = 4,
/// Merge assigned to drop parts (with TTLMergeSelector)
TTL_DROP_MERGE = 5,
};
String query_id;

View File

@ -152,7 +152,9 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
size_t need_total_size = 0;
if (entry.merge_type != MergeType::TTLDrop)
need_total_size = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;
@ -180,7 +182,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
std::optional<CurrentlySubmergingEmergingTagger> tagger;
ReservationSharedPtr reserved_space = storage.balancedReservation(
metadata_snapshot,
estimated_space_for_merge,
need_total_size,
max_volume_index,
future_merged_part->name,
future_merged_part->part_info,
@ -190,7 +192,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
if (!reserved_space)
reserved_space = storage.reserveSpacePreferringTTLRules(
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
metadata_snapshot, need_total_size, ttl_infos, time(nullptr), max_volume_index);
future_merged_part->uuid = entry.new_part_uuid;
future_merged_part->updatePath(storage, reserved_space.get());

View File

@ -314,18 +314,31 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
TTLDeleteMergeSelector drop_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
true);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
parts_to_merge = drop_ttl_selector.select(parts_ranges,data_settings->max_bytes_to_merge_at_max_space_in_pool);
if (!parts_to_merge.empty())
{
future_part->merge_type = MergeType::TTLDelete;
future_part->merge_type = MergeType::TTLDrop;
}
else if (metadata_snapshot->hasAnyRecompressionTTL())
else if (!data_settings->ttl_only_drop_parts)
{
TTLDeleteMergeSelector delete_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
false);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
future_part->merge_type = MergeType::TTLDelete;
}
if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL())
{
TTLRecompressMergeSelector recompress_ttl_selector(
next_recompress_ttl_merge_times_by_partition,

View File

@ -20,7 +20,7 @@ MergeType checkAndGetMergeType(UInt32 merge_type)
bool isTTLMergeType(MergeType merge_type)
{
return merge_type == MergeType::TTLDelete || merge_type == MergeType::TTLRecompress;
return merge_type == MergeType::TTLDelete || merge_type == MergeType::TTLRecompress || merge_type == MergeType::TTLDrop;
}
}

View File

@ -19,6 +19,8 @@ enum class MergeType
TTLDelete = 2,
/// Merge with recompression
TTLRecompress = 3,
/// Merge assigned to drop parts (with TTLMergeSelector)
TTLDrop = 4,
};
/// Check parsed merge_type from raw int and get enum value.

View File

@ -929,7 +929,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
/// If merge_type is TTLDrop, no need to reserve disk space
size_t need_total_size = 0;
if (future_part->merge_type != MergeType::TTLDrop)
need_total_size = MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts);
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, need_total_size, *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
}

View File

@ -1 +1 @@
MergeParts TTLDeleteMerge
MergeParts TTLDropMerge