Fix excessive memory usage for FINAL (due to too much streams usage) (#50429)

Previously it could create MergeTreeInOrder for each mark, however this
could be very suboptimal, due to each MergeTreeInOrder has some memory
overhead.

Now, by collapsing all marks for one part together it is more memory
effiecient.

I've tried the query from the altinity wiki [1] and it decreases memory
usage twice:

    SELECT * FROM repl_tbl FINAL WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 50000 = 0) FORMAT Null

- upstream: MemoryTracker: Peak memory usage (for query): 520.27 MiB.
- patched:  MemoryTracker: Peak memory usage (for query): 260.95 MiB.

  [1]: https://kb.altinity.com/engines/mergetree-table-engine-family/replacingmergetree/#multiple-keys

And it could be not 2x and even more or less, it depends on the gaps in
marks for reading (for example in my setup the memory usage increased a
lot, from ~16GiB of RAM to >64GiB due to lots of marks and gaps).

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-06-07 13:48:08 +02:00 committed by GitHub
parent d25ea9b0cf
commit 036ddcd47b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 11 deletions

View File

@ -126,7 +126,9 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
return marks_in_current_layer < intersected_parts * 2;
};
result_layers.emplace_back();
auto & current_layer = result_layers.emplace_back();
/// Map part_idx into index inside layer, used to merge marks from the same part into one reader
std::unordered_map<size_t, size_t> part_idx_in_layer;
while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
{
@ -140,11 +142,16 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
if (current.event == PartsRangesIterator::EventType::RangeEnd)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], current.range.end}});
const auto & mark = MarkRange{current_part_range_begin[part_idx], current.range.end};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
current_layer.emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin.erase(part_idx);
current_part_range_end.erase(part_idx);
@ -170,11 +177,17 @@ std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDat
}
for (const auto & [part_idx, last_mark] : current_part_range_end)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], last_mark + 1}});
const auto & mark = MarkRange{current_part_range_begin[part_idx], last_mark + 1};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin[part_idx] = current_part_range_end[part_idx];
}

View File

@ -0,0 +1,43 @@
-- { echoOn }
EXPLAIN PIPELINE SELECT * FROM data FINAL WHERE v1 >= now() - INTERVAL 180 DAY
SETTINGS max_threads=2, max_final_threads=2, force_data_skipping_indices='v1_index', use_skip_indexes_if_final=1
FORMAT LineAsString;
(Expression)
ExpressionTransform × 2
(Filter)
FilterTransform × 2
(ReadFromMergeTree)
ExpressionTransform × 2
AggregatingSortedTransform 2 → 1
ExpressionTransform × 2
FilterSortedStreamByRange × 2
Description: filter values in [(999424), +inf)
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
AggregatingSortedTransform
ExpressionTransform
FilterSortedStreamByRange
Description: filter values in [-inf, (999424))
ExpressionTransform
MergeTreeInOrder 0 → 1
EXPLAIN PIPELINE SELECT * FROM data FINAL WHERE v1 >= now() - INTERVAL 180 DAY
SETTINGS max_threads=2, max_final_threads=2, force_data_skipping_indices='v1_index', use_skip_indexes_if_final=0
FORMAT LineAsString;
(Expression)
ExpressionTransform × 2
(Filter)
FilterTransform × 2
(ReadFromMergeTree)
ExpressionTransform × 2
AggregatingSortedTransform 2 → 1
ExpressionTransform × 2
FilterSortedStreamByRange × 2
Description: filter values in [(999424), +inf)
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
AggregatingSortedTransform
ExpressionTransform
FilterSortedStreamByRange
Description: filter values in [-inf, (999424))
ExpressionTransform
MergeTreeInOrder 0 → 1

View File

@ -0,0 +1,28 @@
-- Tags: no-random-merge-tree-settings, no-random-settings
DROP TABLE IF EXISTS data;
CREATE TABLE data
(
key Int,
v1 DateTime,
INDEX v1_index v1 TYPE minmax GRANULARITY 1
) ENGINE=AggregatingMergeTree()
ORDER BY key
SETTINGS index_granularity=8192, min_bytes_for_wide_part=0, min_rows_for_wide_part=0;
SYSTEM STOP MERGES data;
-- generate 50% of marks that cannot be skipped with v1_index
-- this will create a gap in marks
INSERT INTO data SELECT number, if(number/8192 % 2 == 0, now(), now() - INTERVAL 200 DAY) FROM numbers(1e6);
INSERT INTO data SELECT number+1e6, if(number/8192 % 2 == 0, now(), now() - INTERVAL 200 DAY) FROM numbers(1e6);
-- { echoOn }
EXPLAIN PIPELINE SELECT * FROM data FINAL WHERE v1 >= now() - INTERVAL 180 DAY
SETTINGS max_threads=2, max_final_threads=2, force_data_skipping_indices='v1_index', use_skip_indexes_if_final=1
FORMAT LineAsString;
EXPLAIN PIPELINE SELECT * FROM data FINAL WHERE v1 >= now() - INTERVAL 180 DAY
SETTINGS max_threads=2, max_final_threads=2, force_data_skipping_indices='v1_index', use_skip_indexes_if_final=0
FORMAT LineAsString;