Compress marks in memory

This commit is contained in:
Michael Kolupaev 2023-03-06 21:09:13 -08:00
parent 21ec04bcd1
commit d3a514d221
7 changed files with 275 additions and 41 deletions

View File

@ -166,6 +166,8 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks") \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks") \
M(LoadedMarksCount, "Number of marks loaded (total across columns).") \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.") \
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \

View File

@ -0,0 +1,103 @@
#include <Formats/MarkInCompressedFile.h>
#include <Common/BitHelpers.h>
namespace DB
{
// Write a range of bits in a bit-packed array.
// The array must be overallocated by one element.
// The bit range must be pre-filled with zeros.
void writeBits(UInt64 * dest, size_t bit_offset, UInt64 value)
{
size_t mod = bit_offset % 64;
dest[bit_offset / 64] |= value << mod;
if (mod)
dest[bit_offset / 64 + 1] |= value >> (64 - mod);
}
// The array must be overallocated by one element.
UInt64 readBits(const UInt64 * src, size_t bit_offset, size_t num_bits)
{
size_t mod = bit_offset % 64;
UInt64 value = src[bit_offset / 64] >> mod;
if (mod)
value |= src[bit_offset / 64 + 1] << (64 - mod);
return value & maskLowBits<UInt64>(num_bits);
}
MarksInCompressedFile::MarksInCompressedFile(const PlainArray & marks)
: num_marks(marks.size()), blocks((marks.size() + MARKS_PER_BLOCK - 1) / MARKS_PER_BLOCK, BlockInfo{})
{
if (num_marks == 0)
{
return;
}
// First pass: calculate layout of all blocks and total memory required.
size_t packed_bits = 0;
for (size_t block_idx = 0; block_idx < blocks.size(); ++block_idx)
{
BlockInfo & block = blocks[block_idx];
block.bit_offset_in_packed_array = packed_bits;
size_t max_x = 0;
size_t max_y = 0;
size_t num_marks_in_this_block = std::min(MARKS_PER_BLOCK, num_marks - block_idx * MARKS_PER_BLOCK);
for (size_t i = 0; i < num_marks_in_this_block; ++i)
{
const auto & mark = marks[block_idx * MARKS_PER_BLOCK + i];
block.min_x = std::min(block.min_x, mark.offset_in_compressed_file);
max_x = std::max(max_x, mark.offset_in_compressed_file);
block.min_y = std::min(block.min_y, mark.offset_in_decompressed_block);
max_y = std::max(max_y, mark.offset_in_decompressed_block);
block.trailing_zero_bits_in_y
= std::min(block.trailing_zero_bits_in_y, static_cast<UInt8>(getTrailingZeroBits(mark.offset_in_decompressed_block)));
}
block.bits_for_x = sizeof(size_t) * 8 - getLeadingZeroBits(max_x - block.min_x);
block.bits_for_y = sizeof(size_t) * 8 - getLeadingZeroBits((max_y - block.min_y) >> block.trailing_zero_bits_in_y);
packed_bits += num_marks_in_this_block * (block.bits_for_x + block.bits_for_y);
}
// Overallocate by +1 element to let the bit packing/unpacking do less bounds checking.
size_t packed_length = (packed_bits + 63) / 64 + 1;
packed.reserve_exact(packed_length);
packed.resize_fill(packed_length);
// Second pass: write out the packed marks.
for (size_t idx = 0; idx < num_marks; ++idx)
{
const auto & mark = marks[idx];
auto [block, offset] = lookUpMark(idx);
writeBits(packed.data(), offset, mark.offset_in_compressed_file - block->min_x);
writeBits(
packed.data(),
offset + block->bits_for_x,
(mark.offset_in_decompressed_block - block->min_y) >> block->trailing_zero_bits_in_y);
}
}
MarkInCompressedFile MarksInCompressedFile::get(size_t idx) const
{
auto [block, offset] = lookUpMark(idx);
size_t x = block->min_x + readBits(packed.data(), offset, block->bits_for_x);
size_t y = block->min_y + (readBits(packed.data(), offset + block->bits_for_x, block->bits_for_y) << block->trailing_zero_bits_in_y);
return MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
std::tuple<const MarksInCompressedFile::BlockInfo *, size_t> MarksInCompressedFile::lookUpMark(size_t idx) const
{
size_t block_idx = idx / MARKS_PER_BLOCK;
const BlockInfo & block = blocks[block_idx];
size_t offset = block.bit_offset_in_packed_array + (idx - block_idx * MARKS_PER_BLOCK) * (block.bits_for_x + block.bits_for_y);
return {&block, offset};
}
size_t MarksInCompressedFile::approximateMemoryUsage() const
{
return sizeof(*this) + blocks.size() * sizeof(blocks[0]) + packed.size() * sizeof(packed[0]);
}
}

View File

@ -2,8 +2,8 @@
#include <tuple>
#include <base/types.h>
#include <IO/WriteHelpers.h>
#include <base/types.h>
#include <Common/PODArray.h>
@ -23,15 +23,9 @@ struct MarkInCompressedFile
return std::tie(offset_in_compressed_file, offset_in_decompressed_block)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block);
}
bool operator!=(const MarkInCompressedFile & rhs) const
{
return !(*this == rhs);
}
bool operator!=(const MarkInCompressedFile & rhs) const { return !(*this == rhs); }
auto asTuple() const
{
return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block);
}
auto asTuple() const { return std::make_tuple(offset_in_compressed_file, offset_in_decompressed_block); }
String toString() const
{
@ -40,20 +34,87 @@ struct MarkInCompressedFile
String toStringWithRows(size_t rows_num) const
{
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + "," + DB::toString(rows_num) + ")";
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ","
+ DB::toString(rows_num) + ")";
}
};
class MarksInCompressedFile : public PODArray<MarkInCompressedFile>
/**
* In-memory representation of an array of marks.
*
* Uses an ad-hoc compression scheme that decreases memory usage while allowing
* random access in O(1) time.
* This is independent from the marks *file* format, which may be uncompressed
* or use a different compression method.
*
* Typical memory usage:
* * ~3 bytes/mark for integer columns
* * ~5 bytes/mark for string columns
* * ~0.3 bytes/mark for trivial marks in auxiliary dict files of LowCardinality columns
*/
class MarksInCompressedFile
{
public:
explicit MarksInCompressedFile(size_t n) : PODArray(n) {}
using PlainArray = PODArray<MarkInCompressedFile>;
void read(ReadBuffer & buffer, size_t from, size_t count)
MarksInCompressedFile(const PlainArray & marks);
MarkInCompressedFile get(size_t idx) const;
size_t approximateMemoryUsage() const;
private:
/** Throughout this class:
* * "x" stands for offset_in_compressed_file,
* * "y" stands for offset_in_decompressed_block.
*/
/** We need to store a sequence of marks, each consisting of two 64-bit integers:
* offset_in_compressed_file and offset_in_decompressed_block. We'll call them x and y for
* convenience, since compression doesn't care what they mean. The compression exploits the
* following regularities:
* * y is usually zero.
* * x usually increases steadily.
* * Differences between x values in nearby marks usually fit in much fewer than 64 bits.
*
* We split the sequence of marks into blocks, each containing MARKS_PER_BLOCK marks.
* (Not to be confused with data blocks.)
* For each mark, we store the difference [value] - [min value in the block], for each of the
* two values in the mark. Each block specifies the number of bits to use for these differences
* for all marks in this block.
* The smaller the blocks the fewer bits are required, but the bigger the relative overhead of
* block headers.
*
* Packed marks and block headers all live in one contiguous array.
*/
struct BlockInfo
{
buffer.readStrict(reinterpret_cast<char *>(data() + from), count * sizeof(MarkInCompressedFile));
}
// Min offset_in_compressed_file and offset_in_decompressed_block, correspondingly.
size_t min_x = UINT64_MAX;
size_t min_y = UINT64_MAX;
// Place in `packed` where this block start.
size_t bit_offset_in_packed_array;
// How many bits each mark takes. These numbers are bit-packed in the `packed` array.
// Can be zero. (Especially for y, which is typically all zeroes.)
UInt8 bits_for_x;
UInt8 bits_for_y;
// The `y` values should be <<'ed by this amount.
// Useful for integer columns when marks granularity is a power of 2; in this case all
// offset_in_decompressed_block values are divisible by 2^15 or so.
UInt8 trailing_zero_bits_in_y = 63;
};
static constexpr size_t MARKS_PER_BLOCK = 256;
size_t num_marks;
PODArray<BlockInfo> blocks;
PODArray<UInt64> packed;
// Mark idx -> {block info, bit offset in `packed`}.
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
}

View File

@ -0,0 +1,52 @@
#include <random>
#include <gtest/gtest.h>
#include <Formats/MarkInCompressedFile.h>
using namespace DB;
TEST(Marks, Compression)
{
std::random_device dev;
std::mt19937 rng(dev());
auto gen = [&](size_t count, size_t max_x_increment, size_t max_y_increment)
{
size_t x = 0, y = 0;
PODArray<MarkInCompressedFile> plain(count);
for (int i = 0; i < count; ++i)
{
x += rng() % (max_x_increment + 1);
y += rng() % (max_y_increment + 1);
plain[i] = MarkInCompressedFile{.offset_in_compressed_file = x, .offset_in_decompressed_block = y};
}
return plain;
};
auto test = [](const PODArray<MarkInCompressedFile> & plain, size_t max_bits_per_mark)
{
PODArray<MarkInCompressedFile> copy;
copy.assign(plain); // paranoid in case next line mutates it
MarksInCompressedFile marks(copy);
for (size_t i = 0; i < plain.size(); ++i)
ASSERT_EQ(marks.get(i), plain[i]);
EXPECT_LE((marks.approximateMemoryUsage() - sizeof(MarksInCompressedFile)) * 8, plain.size() * max_bits_per_mark);
};
// Typical.
test(gen(10000, 1'000'000, 0), 30);
// Completely random 64-bit values.
test(gen(10000, UINT64_MAX - 1, UINT64_MAX - 1), 130);
// All zeros.
test(gen(10000, 0, 0), 2);
// Short.
test(gen(10, 1000, 1000), 65);
// Empty.
test(gen(0, 0, 0), 0);
}

View File

@ -26,7 +26,7 @@ struct MarksWeightFunction
size_t operator()(const MarksInCompressedFile & marks) const
{
return marks.size() * sizeof(MarkInCompressedFile) + MARK_CACHE_OVERHEAD;
return marks.approximateMemoryUsage() + MARK_CACHE_OVERHEAD;
}
};

View File

@ -1,13 +1,13 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/ThreadPool.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <utility>
@ -15,6 +15,8 @@ namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
extern const Event LoadedMarksCount;
extern const Event LoadedMarksMemoryBytes;
}
namespace DB
@ -62,7 +64,7 @@ MergeTreeMarksLoader::~MergeTreeMarksLoader()
}
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
MarkInCompressedFile MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
{
if (!marks)
{
@ -87,7 +89,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column index: {} is out of range [0, {})", column_index, columns_in_mark);
#endif
return (*marks)[row_index * columns_in_mark + column_index];
return marks->get(row_index * columns_in_mark + column_index);
}
@ -100,14 +102,17 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
// We first read the marks into a temporary simple array, then compress them into a more compact
// representation.
PODArray<MarkInCompressedFile> plain_marks(marks_count * columns_in_mark); // temporary
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Bad size of marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
file_size, expected_uncompressed_size);
file_size,
expected_uncompressed_size);
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
@ -119,12 +124,16 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
if (!index_granularity_info.mark_type.adaptive)
{
/// Read directly to marks.
reader->readStrict(reinterpret_cast<char *>(res->data()), expected_uncompressed_size);
reader->readStrict(reinterpret_cast<char *>(plain_marks.data()), expected_uncompressed_size);
if (!reader->eof())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, is eof: {}, buffer size: {}, file size: {}",
mrk_path, reader->eof(), reader->buffer().size(), file_size);
mrk_path,
reader->eof(),
reader->buffer().size(),
file_size);
}
else
{
@ -132,7 +141,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t granularity;
while (!reader->eof())
{
res->read(*reader, i * columns_in_mark, columns_in_mark);
reader->readStrict(
reinterpret_cast<char *>(plain_marks.data() + i * columns_in_mark), columns_in_mark * sizeof(MarkInCompressedFile));
readIntBinary(granularity, *reader);
++i;
}
@ -141,7 +151,11 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
}
res->protect();
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
return res;
}
@ -154,7 +168,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (save_marks_in_cache)
{
auto callback = [this]{ return loadMarksImpl(); };
auto callback = [this] { return loadMarksImpl(); };
loaded_marks = mark_cache->getOrSet(key, callback);
}
else
@ -170,8 +184,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
if (!loaded_marks)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}",
(fs::path(data_part_storage->getFullPath()) / mrk_path).string());
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}", (fs::path(data_part_storage->getFullPath()) / mrk_path).string());
}
return loaded_marks;
@ -179,11 +192,14 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
{
return scheduleFromThreadPool<MarkCache::MappedPtr>([this]() -> MarkCache::MappedPtr
{
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
}, *load_marks_threadpool, "LoadMarksThread");
return scheduleFromThreadPool<MarkCache::MappedPtr>(
[this]() -> MarkCache::MappedPtr
{
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
},
*load_marks_threadpool,
"LoadMarksThread");
}
}

View File

@ -30,7 +30,7 @@ public:
~MergeTreeMarksLoader();
const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0);
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
private:
DataPartStoragePtr data_part_storage;