From 227b8676cd89881c52520041d05c411fbea6f083 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Thu, 26 Jan 2023 11:09:56 +0000 Subject: [PATCH] Tiny improvements around the Gorilla/Delta codecs --- src/Compression/CompressionCodecDelta.cpp | 19 +++---- src/Compression/CompressionCodecGorilla.cpp | 53 +++++++++--------- src/Compression/ICompressionCodec.h | 12 ++--- src/IO/BitHelpers.h | 60 ++++++++++----------- 4 files changed, 68 insertions(+), 76 deletions(-) diff --git a/src/Compression/CompressionCodecDelta.cpp b/src/Compression/CompressionCodecDelta.cpp index 655ab92b5ac..1d27a0784c6 100644 --- a/src/Compression/CompressionCodecDelta.cpp +++ b/src/Compression/CompressionCodecDelta.cpp @@ -30,7 +30,7 @@ protected: bool isGenericCompression() const override { return false; } private: - UInt8 delta_bytes_size; + const UInt8 delta_bytes_size; }; @@ -68,8 +68,8 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest) if (source_size % sizeof(T) != 0) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot delta compress, data size {} is not aligned to {}", source_size, sizeof(T)); - T prev_src{}; - const char * source_end = source + source_size; + T prev_src = 0; + const char * const source_end = source + source_size; while (source < source_end) { T curr_src = unalignedLoad(source); @@ -84,17 +84,17 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest) template void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size) { - const char * output_end = dest + output_size; + const char * const output_end = dest + output_size; if (source_size % sizeof(T) != 0) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot delta decompress, data size {} is not aligned to {}", source_size, sizeof(T)); T accumulator{}; - const char * source_end = source + source_size; + const char * const source_end = source + source_size; while (source < source_end) { accumulator += unalignedLoad(source); - if (dest + sizeof(accumulator) > output_end) + if (dest + sizeof(accumulator) > output_end) [[unlikely]] throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress the data"); unalignedStore(dest, accumulator); @@ -140,7 +140,7 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_ UInt8 bytes_size = source[0]; - if (bytes_size == 0) + if (!(bytes_size == 1 || bytes_size == 2 || bytes_size == 4 || bytes_size == 8)) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress. File has wrong header"); UInt8 bytes_to_skip = uncompressed_size % bytes_size; @@ -190,7 +190,7 @@ UInt8 getDeltaBytesSize(const IDataType * column_type) void registerCodecDelta(CompressionCodecFactory & factory) { UInt8 method_code = static_cast(CompressionMethodByte::Delta); - factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr + auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr { UInt8 delta_bytes_size = 0; @@ -215,7 +215,8 @@ void registerCodecDelta(CompressionCodecFactory & factory) } return std::make_shared(delta_bytes_size); - }); + }; + factory.registerCompressionCodecWithType("Delta", method_code, codec_builder); } CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size) diff --git a/src/Compression/CompressionCodecGorilla.cpp b/src/Compression/CompressionCodecGorilla.cpp index 6c7c7138776..50ef94cd625 100644 --- a/src/Compression/CompressionCodecGorilla.cpp +++ b/src/Compression/CompressionCodecGorilla.cpp @@ -11,19 +11,18 @@ #include #include +#include #include #include #include -#include - namespace DB { /** Gorilla column codec implementation. * - * Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf + * Based on Gorilla paper: https://dl.acm.org/doi/10.14778/2824032.2824078 * * This codec is best used against monotonic floating sequences, like CPU usage percentage * or any other gauge. @@ -125,7 +124,7 @@ protected: bool isGenericCompression() const override { return false; } private: - UInt8 data_bytes_size; + const UInt8 data_bytes_size; }; @@ -139,7 +138,7 @@ namespace ErrorCodes namespace { -constexpr inline UInt8 getBitLengthOfLength(UInt8 data_bytes_size) +constexpr UInt8 getBitLengthOfLength(UInt8 data_bytes_size) { // 1-byte value is 8 bits, and we need 4 bits to represent 8 : 1000, // 2-byte 16 bits => 5 @@ -147,21 +146,20 @@ constexpr inline UInt8 getBitLengthOfLength(UInt8 data_bytes_size) // 8-byte 64 bits => 7 const UInt8 bit_lengths[] = {0, 4, 5, 0, 6, 0, 0, 0, 7}; assert(data_bytes_size >= 1 && data_bytes_size < sizeof(bit_lengths) && bit_lengths[data_bytes_size] != 0); - return bit_lengths[data_bytes_size]; } UInt32 getCompressedHeaderSize(UInt8 data_bytes_size) { - const UInt8 items_count_size = 4; - + constexpr UInt8 items_count_size = 4; return items_count_size + data_bytes_size; } UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size) { const UInt32 items_count = uncompressed_size / data_bytes_size; + static const auto DATA_BIT_LENGTH = getBitLengthOfLength(data_bytes_size); // -1 since there must be at least 1 non-zero bit. static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1; @@ -182,7 +180,7 @@ struct BinaryValueInfo }; template -BinaryValueInfo getLeadingAndTrailingBits(const T & value) +BinaryValueInfo getBinaryValueInfo(const T & value) { constexpr UInt8 bit_size = sizeof(T) * 8; @@ -190,28 +188,25 @@ BinaryValueInfo getLeadingAndTrailingBits(const T & value) const UInt8 tz = getTrailingZeroBits(value); const UInt8 data_size = value == 0 ? 0 : static_cast(bit_size - lz - tz); - return BinaryValueInfo{lz, data_size, tz}; + return {lz, data_size, tz}; } template UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) { - static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T)); - // -1 since there must be at least 1 non-zero bit. - static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1; - if (source_size % sizeof(T) != 0) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress, data size {} is not aligned to {}", source_size, sizeof(T)); - const char * source_end = source + source_size; - const char * dest_start = dest; - const char * dest_end = dest + dest_size; + + const char * const source_end = source + source_size; + const char * const dest_start = dest; + const char * const dest_end = dest + dest_size; const UInt32 items_count = source_size / sizeof(T); unalignedStoreLE(dest, items_count); dest += sizeof(items_count); - T prev_value{}; + T prev_value = 0; // That would cause first XORed value to be written in-full. BinaryValueInfo prev_xored_info{0, 0, 0}; @@ -226,13 +221,17 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, BitWriter writer(dest, dest_end - dest); + static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T)); + // -1 since there must be at least 1 non-zero bit. + static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1; + while (source < source_end) { const T curr_value = unalignedLoadLE(source); source += sizeof(curr_value); const auto xored_data = curr_value ^ prev_value; - const BinaryValueInfo curr_xored_info = getLeadingAndTrailingBits(xored_data); + const BinaryValueInfo curr_xored_info = getBinaryValueInfo(xored_data); if (xored_data == 0) { @@ -265,11 +264,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, template void decompressDataForType(const char * source, UInt32 source_size, char * dest) { - static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T)); - // -1 since there must be at least 1 non-zero bit. - static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1; - - const char * source_end = source + source_size; + const char * const source_end = source + source_size; if (source + sizeof(UInt32) > source_end) return; @@ -277,7 +272,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest) const UInt32 items_count = unalignedLoadLE(source); source += sizeof(items_count); - T prev_value{}; + T prev_value = 0; // decoding first item if (source + sizeof(T) > source_end || items_count < 1) @@ -293,13 +288,17 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest) BinaryValueInfo prev_xored_info{0, 0, 0}; + static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T)); + // -1 since there must be at least 1 non-zero bit. + static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1; + // since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes, // we have to keep track of items to avoid reading more that there is. for (UInt32 items_read = 1; items_read < items_count && !reader.eof(); ++items_read) { T curr_value = prev_value; BinaryValueInfo curr_xored_info = prev_xored_info; - T xored_data{}; + T xored_data = 0; if (reader.readBit() == 1) { @@ -314,7 +313,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest) if (curr_xored_info.leading_zero_bits == 0 && curr_xored_info.data_bits == 0 - && curr_xored_info.trailing_zero_bits == 0) + && curr_xored_info.trailing_zero_bits == 0) [[unlikely]] { throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress gorilla-encoded data: corrupted input data."); } diff --git a/src/Compression/ICompressionCodec.h b/src/Compression/ICompressionCodec.h index f40404a84f3..46695f80adb 100644 --- a/src/Compression/ICompressionCodec.h +++ b/src/Compression/ICompressionCodec.h @@ -11,13 +11,6 @@ namespace DB { -class ICompressionCodec; - -using CompressionCodecPtr = std::shared_ptr; -using Codecs = std::vector; - -class IDataType; - extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size); /** @@ -120,7 +113,7 @@ protected: /// Return size of compressed data without header virtual UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; } - /// Actually compress data, without header + /// Actually compress data without header virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0; /// Actually decompress data without header @@ -134,4 +127,7 @@ private: CodecMode decompressMode{CodecMode::Synchronous}; }; +using CompressionCodecPtr = std::shared_ptr; +using Codecs = std::vector; + } diff --git a/src/IO/BitHelpers.h b/src/IO/BitHelpers.h index 471d1a3b805..34173ccd8f9 100644 --- a/src/IO/BitHelpers.h +++ b/src/IO/BitHelpers.h @@ -35,28 +35,26 @@ extern const int ATTEMPT_TO_READ_AFTER_EOF; class BitReader { - using BufferType = unsigned __int128; - - const char * source_begin; + const char * const source_begin; + const char * const source_end; const char * source_current; - const char * source_end; - BufferType bits_buffer; - UInt8 bits_count; + using BufferType = unsigned __int128; + BufferType bits_buffer = 0; + + UInt8 bits_count = 0; public: BitReader(const char * begin, size_t size) - : source_begin(begin), - source_current(begin), - source_end(begin + size), - bits_buffer(0), - bits_count(0) + : source_begin(begin) + , source_end(begin + size) + , source_current(begin) {} ~BitReader() = default; // reads bits_to_read high-bits from bits_buffer - ALWAYS_INLINE inline UInt64 readBits(UInt8 bits_to_read) + ALWAYS_INLINE UInt64 readBits(UInt8 bits_to_read) { if (bits_to_read > bits_count) fillBitBuffer(); @@ -64,7 +62,7 @@ public: return getBitsFromBitBuffer(bits_to_read); } - inline UInt8 peekByte() + UInt8 peekByte() { if (bits_count < 8) fillBitBuffer(); @@ -72,31 +70,31 @@ public: return getBitsFromBitBuffer(8); } - ALWAYS_INLINE inline UInt8 readBit() + ALWAYS_INLINE UInt8 readBit() { return static_cast(readBits(1)); } // skip bits from bits_buffer - inline void skipBufferedBits(UInt8 bits) + void skipBufferedBits(UInt8 bits) { bits_buffer <<= bits; bits_count -= bits; } - inline bool eof() const + bool eof() const { return bits_count == 0 && source_current >= source_end; } // number of bits that was already read by clients with readBits() - inline UInt64 count() const + UInt64 count() const { return (source_current - source_begin) * 8 - bits_count; } - inline UInt64 remaining() const + UInt64 remaining() const { return (source_end - source_current) * 8 + bits_count; } @@ -105,7 +103,7 @@ private: enum GetBitsMode {CONSUME, PEEK}; // read data from internal buffer, if it has not enough bits, result is undefined. template - inline UInt64 getBitsFromBitBuffer(UInt8 bits_to_read) + UInt64 getBitsFromBitBuffer(UInt8 bits_to_read) { assert(bits_to_read > 0); @@ -152,24 +150,22 @@ private: class BitWriter { - using BufferType = unsigned __int128; - char * dest_begin; - char * dest_current; char * dest_end; + char * dest_current; - BufferType bits_buffer; - UInt8 bits_count; + using BufferType = unsigned __int128; + BufferType bits_buffer = 0; + + UInt8 bits_count = 0; static constexpr UInt8 BIT_BUFFER_SIZE = sizeof(bits_buffer) * 8; public: BitWriter(char * begin, size_t size) - : dest_begin(begin), - dest_current(begin), - dest_end(begin + size), - bits_buffer(0), - bits_count(0) + : dest_begin(begin) + , dest_end(begin + size) + , dest_current(begin) {} ~BitWriter() @@ -178,7 +174,7 @@ public: } // write `bits_to_write` low-bits of `value` to the buffer - inline void writeBits(UInt8 bits_to_write, UInt64 value) + void writeBits(UInt8 bits_to_write, UInt64 value) { assert(bits_to_write > 0); @@ -199,14 +195,14 @@ public: } // flush contents of bits_buffer to the dest_current, partial bytes are completed with zeroes. - inline void flush() + void flush() { bits_count = (bits_count + 8 - 1) & ~(8 - 1); // align up to 8-bytes, so doFlush will write all data from bits_buffer while (bits_count != 0) doFlush(); } - inline UInt64 count() const + UInt64 count() const { return (dest_current - dest_begin) * 8 + bits_count; }