Merge pull request #38089 from ClickHouse/fpc-follow-up

Small follow-up for FPC codec
This commit is contained in:
Robert Schulze 2022-06-15 20:33:41 +02:00 committed by GitHub
commit 61709a674d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 64 deletions

View File

@ -248,6 +248,7 @@ Specialized codecs:
- `Delta(delta_bytes)` — Compression approach in which raw values are replaced by the difference of two neighboring values, except for the first value that stays unchanged. Up to `delta_bytes` are used for storing delta values, so `delta_bytes` is the maximum size of raw values. Possible `delta_bytes` values: 1, 2, 4, 8. The default value for `delta_bytes` is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1.
- `DoubleDelta` — Calculates delta of deltas and writes it in compact binary form. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-byte deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
- `Gorilla` — Calculates XOR between current and previous value and writes it in compact binary form. Efficient when storing a series of floating point values that change slowly, because the best compression rate is achieved when neighboring values are binary equal. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. For additional information, see Compressing Values in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
- `FPC` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
- `T64` — Compression approach that crops unused high bits of values in integer data types (including `Enum`, `Date` and `DateTime`). At each step of its algorithm, codec takes a block of 64 values, puts them into 64x64 bit matrix, transposes it, crops the unused bits of values and returns the rest as a sequence. Unused bits are the bits, that do not differ between maximum and minimum values in the whole data part for which the compression is used.
`DoubleDelta` and `Gorilla` codecs are used in Gorilla TSDB as the components of its compressing algorithm. Gorilla approach is effective in scenarios when there is a sequence of slowly changing values with their timestamps. Timestamps are effectively compressed by the `DoubleDelta` codec, and values are effectively compressed by the `Gorilla` codec. For example, to get an effectively stored table, you can create it in the following configuration:

View File

@ -14,6 +14,10 @@
namespace DB
{
/// An implementation of the FPC codec for floating-point values described in the paper
/// M. Burtscher, P. Ratanaworabhan: "FPC: A high-speed compressor for double-precision floating-point data" (2008).
/// Note: The paper only describes compression of 64-bit doubles and leaves 32-bit floats to future work. The code
/// implements them anyways. Your mileage with respect to performance and compression may vary.
class CompressionCodecFPC : public ICompressionCodec
{
public:
@ -23,8 +27,8 @@ public:
void updateHash(SipHash & hash) const override;
static constexpr UInt8 MAX_COMPRESSION_LEVEL{28};
static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL{12};
static constexpr UInt8 MAX_COMPRESSION_LEVEL = 28;
static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL = 12;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
@ -37,10 +41,11 @@ protected:
bool isGenericCompression() const override { return false; }
private:
static constexpr UInt32 HEADER_SIZE{3};
static constexpr UInt32 HEADER_SIZE = 3;
UInt8 float_width; // size of uncompressed float in bytes
UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes
// below members are used by compression, decompression ignores them:
const UInt8 float_width; // size of uncompressed float in bytes
const UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes
};
@ -96,30 +101,6 @@ UInt8 getFloatBytesSize(const IDataType & column_type)
column_type.getName());
}
std::byte encodeEndianness(std::endian endian)
{
switch (endian)
{
case std::endian::little:
return std::byte{0};
case std::endian::big:
return std::byte{1};
}
throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS);
}
std::endian decodeEndianness(std::byte endian)
{
switch (std::to_integer<unsigned char>(endian))
{
case 0:
return std::endian::little;
case 1:
return std::endian::big;
}
throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS);
}
}
void registerCodecFPC(CompressionCodecFactory & factory)
@ -127,7 +108,7 @@ void registerCodecFPC(CompressionCodecFactory & factory)
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 float_width{0};
UInt8 float_width = 0;
if (column_type != nullptr)
float_width = getFloatBytesSize(*column_type);
@ -145,10 +126,8 @@ void registerCodecFPC(CompressionCodecFactory & factory)
throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = literal->value.safeGet<UInt8>();
if (level == 0)
throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
if (level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception("FPC codec level must be at most 28", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
if (level < 1 || level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec level must be between {} and {}", 1, static_cast<int>(CompressionCodecFPC::MAX_COMPRESSION_LEVEL));
}
return std::make_shared<CompressionCodecFPC>(float_width, level);
};
@ -159,11 +138,12 @@ namespace
{
template <std::unsigned_integral TUint>
requires (sizeof(TUint) >= 4)
requires (sizeof(TUint) >= 4)
class DfcmPredictor
{
public:
explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0}
explicit DfcmPredictor(std::size_t table_size)
: table(table_size, 0), prev_value{0}, hash{0}
{
}
@ -200,11 +180,12 @@ private:
};
template <std::unsigned_integral TUint>
requires (sizeof(TUint) >= 4)
requires (sizeof(TUint) >= 4)
class FcmPredictor
{
public:
explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0}
explicit FcmPredictor(std::size_t table_size)
: table(table_size, 0), hash{0}
{
}
@ -238,18 +219,17 @@ private:
std::size_t hash;
};
template <std::unsigned_integral TUint, std::endian Endian = std::endian::native>
requires (Endian == std::endian::little || Endian == std::endian::big)
template <std::unsigned_integral TUint>
class FPCOperation
{
static constexpr std::size_t CHUNK_SIZE{64};
static constexpr auto VALUE_SIZE = sizeof(TUint);
static constexpr std::byte FCM_BIT{0};
static constexpr std::byte DFCM_BIT{1u << 3};
static constexpr auto DFCM_BIT_1 = DFCM_BIT << 4;
static constexpr auto DFCM_BIT_2 = DFCM_BIT;
static constexpr unsigned MAX_ZERO_BYTE_COUNT{0b111u};
static constexpr unsigned MAX_ZERO_BYTE_COUNT = 0b111u;
static constexpr std::endian ENDIAN = std::endian::little;
static constexpr std::size_t CHUNK_SIZE = 64;
public:
FPCOperation(std::span<std::byte> destination, UInt8 compression_level)
@ -264,8 +244,8 @@ public:
std::span chunk_view(chunk);
for (std::size_t i = 0; i < data.size(); i += chunk_view.size_bytes())
{
auto written_values = importChunk(data.subspan(i), chunk_view);
encodeChunk(chunk_view.subspan(0, written_values));
auto written_values_count = importChunk(data.subspan(i), chunk_view);
encodeChunk(chunk_view.subspan(0, written_values_count));
}
return initial_size - result.size();
@ -273,7 +253,7 @@ public:
void decode(std::span<const std::byte> values, std::size_t decoded_size) &&
{
std::size_t read_bytes{0};
std::size_t read_bytes = 0;
std::span<TUint> chunk_view(chunk);
for (std::size_t i = 0; i < decoded_size; i += chunk_view.size_bytes())
@ -329,14 +309,14 @@ private:
std::byte predictor;
};
unsigned encodeCompressedZeroByteCount(int compressed)
unsigned encodeCompressedZeroByteCount(unsigned compressed)
{
if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1)
{
if (compressed >= 4)
--compressed;
}
return std::min(static_cast<unsigned>(compressed), MAX_ZERO_BYTE_COUNT);
return std::min(compressed, MAX_ZERO_BYTE_COUNT);
}
unsigned decodeCompressedZeroByteCount(unsigned encoded_size)
@ -360,14 +340,14 @@ private:
auto zeroes_dfcm = std::countl_zero(compressed_dfcm);
auto zeroes_fcm = std::countl_zero(compressed_fcm);
if (zeroes_dfcm > zeroes_fcm)
return {compressed_dfcm, encodeCompressedZeroByteCount(zeroes_dfcm / BITS_PER_BYTE), DFCM_BIT};
return {compressed_fcm, encodeCompressedZeroByteCount(zeroes_fcm / BITS_PER_BYTE), FCM_BIT};
return {compressed_dfcm, encodeCompressedZeroByteCount(static_cast<unsigned>(zeroes_dfcm) / BITS_PER_BYTE), DFCM_BIT};
return {compressed_fcm, encodeCompressedZeroByteCount(static_cast<unsigned>(zeroes_fcm) / BITS_PER_BYTE), FCM_BIT};
}
void encodePair(TUint first, TUint second)
{
auto [value1, zero_byte_count1, predictor1] = compressValue(first);
auto [value2, zero_byte_count2, predictor2] = compressValue(second);
auto [compressed_value1, zero_byte_count1, predictor1] = compressValue(first);
auto [compressed_value2, zero_byte_count2, predictor2] = compressValue(second);
std::byte header{0x0};
header |= (predictor1 << 4) | predictor2;
header |= static_cast<std::byte>((zero_byte_count1 << 4) | zero_byte_count2);
@ -378,14 +358,14 @@ private:
auto tail_size1 = VALUE_SIZE - zero_byte_count1;
auto tail_size2 = VALUE_SIZE - zero_byte_count2;
std::memcpy(result.data() + 1, valueTail(value1, zero_byte_count1), tail_size1);
std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, zero_byte_count2), tail_size2);
std::memcpy(result.data() + 1, valueTail(compressed_value1, zero_byte_count1), tail_size1);
std::memcpy(result.data() + 1 + tail_size1, valueTail(compressed_value2, zero_byte_count2), tail_size2);
result = result.subspan(1 + tail_size1 + tail_size2);
}
std::size_t decodeChunk(std::span<const std::byte> values, std::span<TUint> seq)
{
std::size_t read_bytes{0};
std::size_t read_bytes = 0;
for (std::size_t i = 0; i < seq.size(); i += 2)
{
read_bytes += decodePair(values.subspan(read_bytes), seq[i], seq[i + 1]);
@ -411,7 +391,7 @@ private:
std::size_t decodePair(std::span<const std::byte> bytes, TUint& first, TUint& second)
{
if (bytes.empty())
if (bytes.empty()) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");
auto zero_byte_count1 = decodeCompressedZeroByteCount(
@ -422,11 +402,11 @@ private:
auto tail_size1 = VALUE_SIZE - zero_byte_count1;
auto tail_size2 = VALUE_SIZE - zero_byte_count2;
if (bytes.size() < 1 + tail_size1 + tail_size2)
if (bytes.size() < 1 + tail_size1 + tail_size2) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");
TUint value1{0};
TUint value2{0};
TUint value1 = 0;
TUint value2 = 0;
std::memcpy(valueTail(value1, zero_byte_count1), bytes.data() + 1, tail_size1);
std::memcpy(valueTail(value2, zero_byte_count2), bytes.data() + 1 + tail_size1, tail_size2);
@ -441,7 +421,7 @@ private:
static void* valueTail(TUint& value, unsigned compressed_size)
{
if constexpr (Endian == std::endian::little)
if constexpr (ENDIAN == std::endian::little)
{
return &value;
}
@ -453,7 +433,10 @@ private:
DfcmPredictor<TUint> dfcm_predictor;
FcmPredictor<TUint> fcm_predictor;
// memcpy the input into this buffer to align reads, this improves performance compared to unaligned reads (bit_cast) by ~10%
std::array<TUint, CHUNK_SIZE> chunk{};
std::span<std::byte> result{};
};
@ -463,7 +446,6 @@ UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_si
{
dest[0] = static_cast<char>(float_width);
dest[1] = static_cast<char>(level);
dest[2] = std::to_integer<char>(encodeEndianness(std::endian::native));
auto dest_size = getMaxCompressedDataSize(source_size);
auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE));
@ -490,8 +472,6 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si
auto compressed_level = std::to_integer<UInt8>(compressed_data[1]);
if (compressed_level == 0 || compressed_level > MAX_COMPRESSION_LEVEL)
throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS);
if (decodeEndianness(compressed_data[2]) != std::endian::native)
throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS);
auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size));
auto src = compressed_data.subspan(HEADER_SIZE);

View File

@ -37,7 +37,11 @@
</substitution>
</substitutions>
<create_query>CREATE TABLE IF NOT EXISTS codec_{seq_type}_{type}_{codec} (n {type} CODEC({codec})) ENGINE = MergeTree PARTITION BY tuple() ORDER BY tuple();</create_query>
<create_query>
CREATE TABLE IF NOT EXISTS codec_{seq_type}_{type}_{codec} (n {type} CODEC({codec}))
ENGINE = MergeTree PARTITION BY tuple()
ORDER BY tuple();
</create_query>
<fill_query>INSERT INTO codec_seq_{type}_{codec} (n) SELECT number/pi() FROM system.numbers LIMIT {num_rows} SETTINGS max_threads=1</fill_query>
<fill_query>INSERT INTO codec_mon_{type}_{codec} (n) SELECT number+sin(number) FROM system.numbers LIMIT {num_rows} SETTINGS max_threads=1</fill_query>