MergeSelector: development [#METR-21841].

This commit is contained in:
Alexey Milovidov 2016-10-31 22:45:22 +03:00
parent 49602c44a9
commit 573a84c89a
3 changed files with 15 additions and 10 deletions

View File

@ -21,7 +21,7 @@ struct MergeTreeSettings
size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024;
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 20;
size_t max_replicated_merges_in_queue = 6;
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;

View File

@ -121,19 +121,19 @@ void selectWithinPartition(
in_range = false;
range_end = i;
size_t range_size = range_end - range_begin;
size_t range_length = range_end - range_begin;
/// Size of range is enough.
if (range_size >= actual_base)
/// Length of range is enough.
if (range_length >= actual_base)
{
/// If size of range is larger than 'max_parts_to_merge' - split it to subranges of almost equal sizes.
/// For example, if 'max_parts_to_merge' == 100 and 'range_size' = 101, split it to subranges of sizes 50 and 51.
size_t num_subranges = (range_size + settings.max_parts_to_merge - 1) / settings.max_parts_to_merge;
/// If length of range is larger than 'max_parts_to_merge' - split it to subranges of almost equal lengths.
/// For example, if 'max_parts_to_merge' == 100 and 'range_length' = 101, split it to subranges of lengths 50 and 51.
size_t num_subranges = (range_length + settings.max_parts_to_merge - 1) / settings.max_parts_to_merge;
for (size_t subrange_index = 0; subrange_index < num_subranges; ++subrange_index)
{
size_t subrange_begin = range_begin + subrange_index * range_size / num_subranges;
size_t subrange_end = range_begin + (subrange_index + 1) * range_size / num_subranges;
size_t subrange_begin = range_begin + subrange_index * range_length / num_subranges;
size_t subrange_end = range_begin + (subrange_index + 1) * range_length / num_subranges;
size_t size_of_subrange = prefix_sums[subrange_end] - prefix_sums[subrange_begin];

View File

@ -527,8 +527,13 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
/** Execute merge only if there are enough free threads in background pool to do merges of that size.
* But if all threads are free (maximal size of merge is allowed) then execute any merge,
* (because it may be ordered by OPTIMIZE or early with differrent settings).
*/
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
if (sum_parts_size_in_bytes > max_parts_size_for_merge)
if (max_parts_size_for_merge != data.settings.max_bytes_to_merge_at_max_space_in_pool
&& sum_parts_size_in_bytes > max_parts_size_for_merge)
{
String reason = "Not executing log entry for part " + entry.new_part_name
+ " because its size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)