Simplier interface

This commit is contained in:
alesapin 2020-09-02 11:28:46 +03:00
parent b08056fa8c
commit fbb37c37df
5 changed files with 13 additions and 13 deletions

View File

@ -158,15 +158,15 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si
} }
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(MergeType merge_type) UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(bool with_ttl) const
{ {
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, merge_type); /// 1 is current thread return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, with_ttl); /// 1 is current thread
} }
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type) UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, bool with_ttl) const
{ {
if (pool_used > pool_size) if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
@ -180,7 +180,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
size_t lowering_setting; size_t lowering_setting;
if (merge_type == MergeType::TTL_DELETE) if (with_ttl)
lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl; lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl;
else else
lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge; lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge;
@ -198,7 +198,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
} }
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const
{ {
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);

View File

@ -59,17 +59,17 @@ public:
/** Get maximum total size of parts to do merge, at current moment of time. /** Get maximum total size of parts to do merge, at current moment of time.
* It depends on number of free threads in background_pool and amount of free space in disk. * It depends on number of free threads in background_pool and amount of free space in disk.
*/ */
UInt64 getMaxSourcePartsSizeForMerge(MergeType merge_type); UInt64 getMaxSourcePartsSizeForMerge(bool with_ttl) const;
/** For explicitly passed size of pool and number of used tasks. /** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue. * This method could be used to calculate threshold depending on number of tasks in replication queue.
*/ */
UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type); UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, bool with_ttl) const;
/** Get maximum total size of parts to do mutation, at current moment of time. /** Get maximum total size of parts to do mutation, at current moment of time.
* It depends only on amount of free space in disk. * It depends only on amount of free space in disk.
*/ */
UInt64 getMaxSourcePartSizeForMutation(); UInt64 getMaxSourcePartSizeForMutation() const;
/** Selects which parts to merge. Uses a lot of heuristics. /** Selects which parts to merge. Uses a lot of heuristics.
* *

View File

@ -1061,7 +1061,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false; return false;
} }
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type) UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type == MergeType::TTL_DELETE)
: merger_mutator.getMaxSourcePartSizeForMutation(); : merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,

View File

@ -650,11 +650,11 @@ bool StorageMergeTree::merge(
if (partition_id.empty()) if (partition_id.empty())
{ {
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::NORMAL); UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(false);
UInt64 max_source_parts_size_with_ttl = 0; UInt64 max_source_parts_size_with_ttl = 0;
if (!aggressive) if (!aggressive)
max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::TTL_DELETE); max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(true);
if (max_source_parts_size > 0) if (max_source_parts_size > 0)
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, max_source_parts_size_with_ttl, out_disable_reason); selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, max_source_parts_size_with_ttl, out_disable_reason);

View File

@ -2526,12 +2526,12 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
else else
{ {
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::NORMAL); storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, false);
UInt64 max_source_parts_size_for_merge_with_ttl = 0; UInt64 max_source_parts_size_for_merge_with_ttl = 0;
if (merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue) if (merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue)
max_source_parts_size_for_merge_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge( max_source_parts_size_for_merge_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::TTL_DELETE); storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, true);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();