Tiny improvements around the Gorilla/Delta codecs

This commit is contained in:
Robert Schulze 2023-01-26 11:09:56 +00:00
parent 9c48ac79c8
commit 227b8676cd
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
4 changed files with 68 additions and 76 deletions

View File

@ -30,7 +30,7 @@ protected:
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
const UInt8 delta_bytes_size;
};
@ -68,8 +68,8 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest)
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot delta compress, data size {} is not aligned to {}", source_size, sizeof(T));
T prev_src{};
const char * source_end = source + source_size;
T prev_src = 0;
const char * const source_end = source + source_size;
while (source < source_end)
{
T curr_src = unalignedLoad<T>(source);
@ -84,17 +84,17 @@ void compressDataForType(const char * source, UInt32 source_size, char * dest)
template <typename T>
void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const char * output_end = dest + output_size;
const char * const output_end = dest + output_size;
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot delta decompress, data size {} is not aligned to {}", source_size, sizeof(T));
T accumulator{};
const char * source_end = source + source_size;
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
if (dest + sizeof(accumulator) > output_end)
if (dest + sizeof(accumulator) > output_end) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress the data");
unalignedStore<T>(dest, accumulator);
@ -140,7 +140,7 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
UInt8 bytes_size = source[0];
if (bytes_size == 0)
if (!(bytes_size == 1 || bytes_size == 2 || bytes_size == 4 || bytes_size == 8))
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress. File has wrong header");
UInt8 bytes_to_skip = uncompressed_size % bytes_size;
@ -190,7 +190,7 @@ UInt8 getDeltaBytesSize(const IDataType * column_type)
void registerCodecDelta(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Delta);
factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = 0;
@ -215,7 +215,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
}
return std::make_shared<CompressionCodecDelta>(delta_bytes_size);
});
};
factory.registerCompressionCodecWithType("Delta", method_code, codec_builder);
}
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size)

View File

@ -11,19 +11,18 @@
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
#include <bitset>
#include <cstring>
#include <algorithm>
#include <type_traits>
#include <bitset>
namespace DB
{
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
* Based on Gorilla paper: https://dl.acm.org/doi/10.14778/2824032.2824078
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
@ -125,7 +124,7 @@ protected:
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
const UInt8 data_bytes_size;
};
@ -139,7 +138,7 @@ namespace ErrorCodes
namespace
{
constexpr inline UInt8 getBitLengthOfLength(UInt8 data_bytes_size)
constexpr UInt8 getBitLengthOfLength(UInt8 data_bytes_size)
{
// 1-byte value is 8 bits, and we need 4 bits to represent 8 : 1000,
// 2-byte 16 bits => 5
@ -147,21 +146,20 @@ constexpr inline UInt8 getBitLengthOfLength(UInt8 data_bytes_size)
// 8-byte 64 bits => 7
const UInt8 bit_lengths[] = {0, 4, 5, 0, 6, 0, 0, 0, 7};
assert(data_bytes_size >= 1 && data_bytes_size < sizeof(bit_lengths) && bit_lengths[data_bytes_size] != 0);
return bit_lengths[data_bytes_size];
}
UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
{
const UInt8 items_count_size = 4;
constexpr UInt8 items_count_size = 4;
return items_count_size + data_bytes_size;
}
UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
{
const UInt32 items_count = uncompressed_size / data_bytes_size;
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(data_bytes_size);
// -1 since there must be at least 1 non-zero bit.
static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
@ -182,7 +180,7 @@ struct BinaryValueInfo
};
template <typename T>
BinaryValueInfo getLeadingAndTrailingBits(const T & value)
BinaryValueInfo getBinaryValueInfo(const T & value)
{
constexpr UInt8 bit_size = sizeof(T) * 8;
@ -190,28 +188,25 @@ BinaryValueInfo getLeadingAndTrailingBits(const T & value)
const UInt8 tz = getTrailingZeroBits(value);
const UInt8 data_size = value == 0 ? 0 : static_cast<UInt8>(bit_size - lz - tz);
return BinaryValueInfo{lz, data_size, tz};
return {lz, data_size, tz};
}
template <typename T>
UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
{
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
// -1 since there must be at least 1 non-zero bit.
static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress, data size {} is not aligned to {}", source_size, sizeof(T));
const char * source_end = source + source_size;
const char * dest_start = dest;
const char * dest_end = dest + dest_size;
const char * const source_end = source + source_size;
const char * const dest_start = dest;
const char * const dest_end = dest + dest_size;
const UInt32 items_count = source_size / sizeof(T);
unalignedStoreLE<UInt32>(dest, items_count);
dest += sizeof(items_count);
T prev_value{};
T prev_value = 0;
// That would cause first XORed value to be written in-full.
BinaryValueInfo prev_xored_info{0, 0, 0};
@ -226,13 +221,17 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest,
BitWriter writer(dest, dest_end - dest);
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
// -1 since there must be at least 1 non-zero bit.
static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
while (source < source_end)
{
const T curr_value = unalignedLoadLE<T>(source);
source += sizeof(curr_value);
const auto xored_data = curr_value ^ prev_value;
const BinaryValueInfo curr_xored_info = getLeadingAndTrailingBits(xored_data);
const BinaryValueInfo curr_xored_info = getBinaryValueInfo(xored_data);
if (xored_data == 0)
{
@ -265,11 +264,7 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest,
template <typename T>
void decompressDataForType(const char * source, UInt32 source_size, char * dest)
{
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
// -1 since there must be at least 1 non-zero bit.
static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
const char * source_end = source + source_size;
const char * const source_end = source + source_size;
if (source + sizeof(UInt32) > source_end)
return;
@ -277,7 +272,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
const UInt32 items_count = unalignedLoadLE<UInt32>(source);
source += sizeof(items_count);
T prev_value{};
T prev_value = 0;
// decoding first item
if (source + sizeof(T) > source_end || items_count < 1)
@ -293,13 +288,17 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
BinaryValueInfo prev_xored_info{0, 0, 0};
static const auto DATA_BIT_LENGTH = getBitLengthOfLength(sizeof(T));
// -1 since there must be at least 1 non-zero bit.
static const auto LEADING_ZEROES_BIT_LENGTH = DATA_BIT_LENGTH - 1;
// since data is tightly packed, up to 1 bit per value, and last byte is padded with zeroes,
// we have to keep track of items to avoid reading more that there is.
for (UInt32 items_read = 1; items_read < items_count && !reader.eof(); ++items_read)
{
T curr_value = prev_value;
BinaryValueInfo curr_xored_info = prev_xored_info;
T xored_data{};
T xored_data = 0;
if (reader.readBit() == 1)
{
@ -314,7 +313,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
if (curr_xored_info.leading_zero_bits == 0
&& curr_xored_info.data_bits == 0
&& curr_xored_info.trailing_zero_bits == 0)
&& curr_xored_info.trailing_zero_bits == 0) [[unlikely]]
{
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress gorilla-encoded data: corrupted input data.");
}

View File

@ -11,13 +11,6 @@
namespace DB
{
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using Codecs = std::vector<CompressionCodecPtr>;
class IDataType;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size);
/**
@ -120,7 +113,7 @@ protected:
/// Return size of compressed data without header
virtual UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; }
/// Actually compress data, without header
/// Actually compress data without header
virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0;
/// Actually decompress data without header
@ -134,4 +127,7 @@ private:
CodecMode decompressMode{CodecMode::Synchronous};
};
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using Codecs = std::vector<CompressionCodecPtr>;
}

View File

@ -35,28 +35,26 @@ extern const int ATTEMPT_TO_READ_AFTER_EOF;
class BitReader
{
using BufferType = unsigned __int128;
const char * source_begin;
const char * const source_begin;
const char * const source_end;
const char * source_current;
const char * source_end;
BufferType bits_buffer;
UInt8 bits_count;
using BufferType = unsigned __int128;
BufferType bits_buffer = 0;
UInt8 bits_count = 0;
public:
BitReader(const char * begin, size_t size)
: source_begin(begin),
source_current(begin),
source_end(begin + size),
bits_buffer(0),
bits_count(0)
: source_begin(begin)
, source_end(begin + size)
, source_current(begin)
{}
~BitReader() = default;
// reads bits_to_read high-bits from bits_buffer
ALWAYS_INLINE inline UInt64 readBits(UInt8 bits_to_read)
ALWAYS_INLINE UInt64 readBits(UInt8 bits_to_read)
{
if (bits_to_read > bits_count)
fillBitBuffer();
@ -64,7 +62,7 @@ public:
return getBitsFromBitBuffer<CONSUME>(bits_to_read);
}
inline UInt8 peekByte()
UInt8 peekByte()
{
if (bits_count < 8)
fillBitBuffer();
@ -72,31 +70,31 @@ public:
return getBitsFromBitBuffer<PEEK>(8);
}
ALWAYS_INLINE inline UInt8 readBit()
ALWAYS_INLINE UInt8 readBit()
{
return static_cast<UInt8>(readBits(1));
}
// skip bits from bits_buffer
inline void skipBufferedBits(UInt8 bits)
void skipBufferedBits(UInt8 bits)
{
bits_buffer <<= bits;
bits_count -= bits;
}
inline bool eof() const
bool eof() const
{
return bits_count == 0 && source_current >= source_end;
}
// number of bits that was already read by clients with readBits()
inline UInt64 count() const
UInt64 count() const
{
return (source_current - source_begin) * 8 - bits_count;
}
inline UInt64 remaining() const
UInt64 remaining() const
{
return (source_end - source_current) * 8 + bits_count;
}
@ -105,7 +103,7 @@ private:
enum GetBitsMode {CONSUME, PEEK};
// read data from internal buffer, if it has not enough bits, result is undefined.
template <GetBitsMode mode>
inline UInt64 getBitsFromBitBuffer(UInt8 bits_to_read)
UInt64 getBitsFromBitBuffer(UInt8 bits_to_read)
{
assert(bits_to_read > 0);
@ -152,24 +150,22 @@ private:
class BitWriter
{
using BufferType = unsigned __int128;
char * dest_begin;
char * dest_current;
char * dest_end;
char * dest_current;
BufferType bits_buffer;
UInt8 bits_count;
using BufferType = unsigned __int128;
BufferType bits_buffer = 0;
UInt8 bits_count = 0;
static constexpr UInt8 BIT_BUFFER_SIZE = sizeof(bits_buffer) * 8;
public:
BitWriter(char * begin, size_t size)
: dest_begin(begin),
dest_current(begin),
dest_end(begin + size),
bits_buffer(0),
bits_count(0)
: dest_begin(begin)
, dest_end(begin + size)
, dest_current(begin)
{}
~BitWriter()
@ -178,7 +174,7 @@ public:
}
// write `bits_to_write` low-bits of `value` to the buffer
inline void writeBits(UInt8 bits_to_write, UInt64 value)
void writeBits(UInt8 bits_to_write, UInt64 value)
{
assert(bits_to_write > 0);
@ -199,14 +195,14 @@ public:
}
// flush contents of bits_buffer to the dest_current, partial bytes are completed with zeroes.
inline void flush()
void flush()
{
bits_count = (bits_count + 8 - 1) & ~(8 - 1); // align up to 8-bytes, so doFlush will write all data from bits_buffer
while (bits_count != 0)
doFlush();
}
inline UInt64 count() const
UInt64 count() const
{
return (dest_current - dest_begin) * 8 + bits_count;
}