Fix right offset for reading LowCardinality dictionary from remote fs in case if right mark was in the middle of compressed block.

This commit is contained in:
Nikolai Kochetov 2023-01-03 18:19:51 +00:00
parent 2892d447d4
commit da26f62a9b
4 changed files with 89 additions and 50 deletions

View File

@ -28,6 +28,11 @@ struct MarkInCompressedFile
return !(*this == rhs);
}
auto asTuple() const
{
return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block);
}
String toString() const
{
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ")";

View File

@ -135,7 +135,7 @@ void MergeTreeReaderStream::init()
}
size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
size_t MergeTreeReaderStream::getRightOffset(size_t right_mark)
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
@ -144,70 +144,83 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
if (marks_count == 0)
return 0;
assert(right_mark_non_included <= marks_count);
assert(right_mark <= marks_count);
size_t result_right_offset;
if (0 < right_mark_non_included && right_mark_non_included < marks_count)
if (0 < right_mark && right_mark < marks_count)
{
/// Find the right border of the last mark we need to read.
/// To do that let's find the upper bound of the offset of the last
/// included mark.
/// In LowCardinality dictionary and in values of Sparse columns
/// several consecutive marks can point to the same offset.
///
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]
if (is_low_cardinality_dictionary)
{
/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.
/// In LowCardinality dictionary several consecutive marks can point to the same offset.
///
/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.
///
/// Example:
/// Mark 0, points to [0, 8]
/// Mark 1, points to [0, 8]
/// Mark 2, points to [0, 8]
/// Mark 3, points to [0, 8]
/// Mark 4, points to [42336, 2255]
/// Mark 5, points to [42336, 2255] <--- for example need to read until 5
/// Mark 6, points to [42336, 2255] <--- not suitable, because have same offset
/// Mark 7, points to [84995, 7738] <--- next different mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
/// If right_mark_non_included has non-zero offset in decompressed block, we have to
/// read its compressed block in a whole, because it may consist data from previous granule.
auto indices = collections::range(right_mark, marks_count);
auto next_different_mark = [&](auto lhs, auto rhs)
{
return marks_loader.getMark(lhs).asTuple() < marks_loader.getMark(rhs).asTuple();
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_mark));
if (it == indices.end())
return file_size;
right_mark = *it;
}
/// This is a good scenario. The compressed block is finished witin the right mark,
/// and previous mark was different.
if (marks_loader.getMark(right_mark).offset_in_decompressed_block == 0
&& marks_loader.getMark(right_mark) != marks_loader.getMark(right_mark - 1))
return marks_loader.getMark(right_mark).offset_in_compressed_file;
/// If right_mark has non-zero offset in decompressed block, we have to
/// read its compressed block in a whole, because it may consist of data from previous granule.
///
/// For example:
/// Mark 10: (758287, 0) <--- right_mark_included
/// Mark 11: (908457, 53477) <--- right_mark_non_included
/// Mark 12: (1064746, 20742) <--- what we are looking for
/// Mark 13: (2009333, 40123)
/// Mark 6, points to [42336, 2255]
/// Mark 7, points to [84995, 7738] <--- right_mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
///
/// Since mark 11 starts from offset in decompressed block 53477,
/// it has some data from mark 10 and we have to read
/// compressed block [908457; 1064746 in a whole.
/// Since mark 7 starts from offset in decompressed block 7738,
/// it has some data from mark 6 and we have to read
/// compressed block [84995; 126531 in a whole.
size_t right_mark_included = right_mark_non_included - 1;
if (is_low_cardinality_dictionary || marks_loader.getMark(right_mark_non_included).offset_in_decompressed_block != 0)
++right_mark_included;
auto indices = collections::range(right_mark_included, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark_included,
[&](auto lhs, auto rhs)
{
return marks_loader.getMark(lhs).offset_in_compressed_file < marks_loader.getMark(rhs).offset_in_compressed_file;
});
auto indices = collections::range(right_mark, marks_count);
auto next_different_compressed_offset = [&](auto lhs, auto rhs)
{
return marks_loader.getMark(lhs).offset_in_compressed_file < marks_loader.getMark(rhs).offset_in_compressed_file;
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_compressed_offset));
if (it != indices.end())
result_right_offset = marks_loader.getMark(*it).offset_in_compressed_file;
else
result_right_offset = file_size;
return marks_loader.getMark(*it).offset_in_compressed_file;
}
else if (right_mark_non_included == 0)
result_right_offset = marks_loader.getMark(right_mark_non_included).offset_in_compressed_file;
else
result_right_offset = file_size;
else if (right_mark == 0)
return marks_loader.getMark(right_mark).offset_in_compressed_file;
return result_right_offset;
return file_size;
}
void MergeTreeReaderStream::seekToMark(size_t index)

View File

@ -49,7 +49,7 @@ public:
private:
void init();
size_t getRightOffset(size_t right_mark_non_included);
size_t getRightOffset(size_t right_mark);
const MergeTreeReaderSettings settings;
const ReadBufferFromFileBase::ProfileCallback profile_callback;

View File

@ -132,3 +132,24 @@ def test_s3_right_border_2(started_cluster):
node1.query("optimize table s3_low_cardinality final")
res = node1.query("select * from s3_low_cardinality where key = 9000")
assert res == "9000\t9000\n"
def test_s3_right_border_3(started_cluster):
node1.query("drop table if exists s3_low_cardinality")
node1.query("create table s3_low_cardinality (x LowCardinality(String)) engine = MergeTree order by tuple() settings min_bytes_for_wide_part=0, storage_policy = 's3', max_compress_block_size=10000")
node1.query("insert into s3_low_cardinality select toString(number % 8000) || if(number < 8192 * 3, 'aaaaaaaaaaaaaaaa', if(number < 8192 * 6, 'bbbbbbbbbbbbbbbbbbbbbbbb', 'ccccccccccccccccccc')) from numbers(8192 * 9)")
# Marks are:
# Mark 0, points to 0, 8, has rows after 8192, decompressed size 0.
# Mark 1, points to 0, 8, has rows after 8192, decompressed size 0.
# Mark 2, points to 0, 8, has rows after 8192, decompressed size 0.
# Mark 3, points to 0, 8, has rows after 8192, decompressed size 0.
# Mark 4, points to 42336, 2255, has rows after 8192, decompressed size 0.
# Mark 5, points to 42336, 2255, has rows after 8192, decompressed size 0.
# Mark 6, points to 42336, 2255, has rows after 8192, decompressed size 0.
# Mark 7, points to 84995, 7738, has rows after 8192, decompressed size 0.
# Mark 8, points to 84995, 7738, has rows after 8192, decompressed size 0.
# Mark 9, points to 126531, 8637, has rows after 0, decompressed size 0.
res = node1.query("select uniq(x) from s3_low_cardinality settings max_threads=2, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem=1, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem=1")
# Reading ranges [0, 5) and [5, 9)
assert res == "23999\n"