ignore max part size for merge setting while executing mutations

This commit is contained in:
CurtizJ 2019-06-17 22:41:48 +03:00
parent 6d9cbc6f28
commit e6dd92f9e1
7 changed files with 47 additions and 31 deletions

View File

@ -125,16 +125,16 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, co
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize()
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge()
{
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSize(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
return getMaxSourcePartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_t pool_used)
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used)
{
if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
@ -154,6 +154,12 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
{
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
bool MergeTreeDataMergerMutator::selectPartsToMerge(
FutureMergedMutatedPart & future_part,
bool aggressive,

View File

@ -45,12 +45,17 @@ public:
/** Get maximum total size of parts to do merge, at current moment of time.
* It depends on number of free threads in background_pool and amount of free space in disk.
*/
UInt64 getMaxSourcePartsSize();
UInt64 getMaxSourcePartsSizeForMerge();
/** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue.
*/
UInt64 getMaxSourcePartsSize(size_t pool_size, size_t pool_used);
UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used);
/** Get maximum total size of parts to do mutation, at current moment of time.
* It depends only on amount of free space in disk.
*/
UInt64 getMaxSourcePartSizeForMutation();
/** Selects which parts to merge. Uses a lot of heuristics.
*

View File

@ -960,7 +960,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
* But if all threads are free (maximal size of merge is allowed) then execute any merge,
* (because it may be ordered by OPTIMIZE or early with differrent settings).
*/
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool
&& sum_parts_size_in_bytes > max_source_parts_size)
{

View File

@ -513,7 +513,7 @@ bool StorageMergeTree::merge(
if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size > 0)
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
else if (out_disable_reason)

View File

@ -2181,35 +2181,33 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
}
else
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_parts_size > 0)
FutureMergedMutatedPart future_merged_part;
if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
{
FutureMergedMutatedPart future_merged_part;
if (merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size, merge_pred))
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
}
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0)
{
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector();
for (const auto & part : data_parts)
{
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
}
else if (queue.countMutations() > 0)
{
/// Choose a part to mutate.
if (part->bytes_on_disk > max_source_part_size_for_mutation)
continue;
DataPartsVector data_parts = getDataPartsVector();
for (const auto & part : data_parts)
std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
if (createLogEntryToMutatePart(*part, *desired_mutation_version))
{
if (part->bytes_on_disk > max_source_parts_size)
continue;
std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
if (createLogEntryToMutatePart(*part, *desired_mutation_version))
{
success = true;
break;
}
success = true;
break;
}
}
}

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<max_bytes_to_merge_at_min_space_in_pool>1</max_bytes_to_merge_at_min_space_in_pool>
<max_bytes_to_merge_at_max_space_in_pool>2</max_bytes_to_merge_at_max_space_in_pool>
</merge_tree>
</yandex>

View File

@ -11,7 +11,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
# Check, that limits on max part size for merges doesn`t affect mutations
node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
nodes = [node1, node2]
@pytest.fixture(scope="module")