From fbb37c37df6c428579130772151492209742008e Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 2 Sep 2020 11:28:46 +0300 Subject: [PATCH] Simplier interface --- src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 10 +++++----- src/Storages/MergeTree/MergeTreeDataMergerMutator.h | 6 +++--- src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 2 +- src/Storages/StorageMergeTree.cpp | 4 ++-- src/Storages/StorageReplicatedMergeTree.cpp | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index a0ab7866402..31d566c4e0e 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -158,15 +158,15 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(MergeType merge_type) +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(bool with_ttl) const { size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); - return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, merge_type); /// 1 is current thread + return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, with_ttl); /// 1 is current thread } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type) +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, bool with_ttl) const { if (pool_used > pool_size) throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); @@ -180,7 +180,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz size_t lowering_setting; - if (merge_type == MergeType::TTL_DELETE) + if (with_ttl) lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl; else lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge; @@ -198,7 +198,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const { const auto data_settings = data.getSettings(); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 086a2a9cae2..6b0e2e9be22 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -59,17 +59,17 @@ public: /** Get maximum total size of parts to do merge, at current moment of time. * It depends on number of free threads in background_pool and amount of free space in disk. */ - UInt64 getMaxSourcePartsSizeForMerge(MergeType merge_type); + UInt64 getMaxSourcePartsSizeForMerge(bool with_ttl) const; /** For explicitly passed size of pool and number of used tasks. * This method could be used to calculate threshold depending on number of tasks in replication queue. */ - UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type); + UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, bool with_ttl) const; /** Get maximum total size of parts to do mutation, at current moment of time. * It depends only on amount of free space in disk. */ - UInt64 getMaxSourcePartSizeForMutation(); + UInt64 getMaxSourcePartSizeForMutation() const; /** Selects which parts to merge. Uses a lot of heuristics. * diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index c9b366a9ec8..d1b4217401c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1061,7 +1061,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( return false; } - UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type) + UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type == MergeType::TTL_DELETE) : merger_mutator.getMaxSourcePartSizeForMutation(); /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 05f2f5254f0..07e373ac93c 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -650,11 +650,11 @@ bool StorageMergeTree::merge( if (partition_id.empty()) { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::NORMAL); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(false); UInt64 max_source_parts_size_with_ttl = 0; if (!aggressive) - max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::TTL_DELETE); + max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(true); if (max_source_parts_size > 0) selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, max_source_parts_size_with_ttl, out_disable_reason); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1c880c8c790..e01926d39d1 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2526,12 +2526,12 @@ void StorageReplicatedMergeTree::mergeSelectingTask() else { UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( - storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::NORMAL); + storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, false); UInt64 max_source_parts_size_for_merge_with_ttl = 0; if (merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue) max_source_parts_size_for_merge_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge( - storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::TTL_DELETE); + storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, true); UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();