From 0697b9ffcffa4203531adbd9cb27aa4513a2b797 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Wed, 25 May 2022 22:04:39 +0300 Subject: [PATCH 1/8] FPC codec --- src/Compression/CompressionCodecFPC.cpp | 498 ++++++++++++++++++++++++ src/Compression/CompressionFactory.cpp | 2 + src/Compression/CompressionInfo.h | 3 +- 3 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 src/Compression/CompressionCodecFPC.cpp diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp new file mode 100644 index 00000000000..3c7aac794c0 --- /dev/null +++ b/src/Compression/CompressionCodecFPC.cpp @@ -0,0 +1,498 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ + +class CompressionCodecFPC : public ICompressionCodec +{ +public: + explicit CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); + + uint8_t getMethodByte() const override; + + void updateHash(SipHash & hash) const override; + +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; + + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + + bool isCompression() const override { return true; } + bool isGenericCompression() const override { return false; } + + static constexpr UInt32 HEADER_SIZE{3}; + +private: + UInt8 float_width; + UInt8 level; +}; + + +namespace ErrorCodes +{ + extern const int CANNOT_COMPRESS; + extern const int CANNOT_DECOMPRESS; + extern const int ILLEGAL_CODEC_PARAMETER; + extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; + extern const int BAD_ARGUMENTS; +} + +uint8_t CompressionCodecFPC::getMethodByte() const +{ + return static_cast(CompressionMethodByte::FPC); +} + +void CompressionCodecFPC::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + +CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_level) + : float_width{float_size}, level{compression_level} +{ + setCodecDescription("FPC", {std::make_shared(static_cast(level))}); +} + +UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + auto float_count = (uncompressed_size + float_width - 1) / float_width; + if (float_count % 2 != 0) { + ++float_count; + } + return HEADER_SIZE + (float_count + float_count / 2) * float_width; +} + +namespace +{ + +UInt8 getFloatBytesSize(const IDataType & column_type) +{ + if (!WhichDataType(column_type).isFloat()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for {} because the data type is not float", + column_type.getName()); + } + + if (auto float_size = column_type.getSizeOfValueInMemory(); float_size >= 4) + { + return static_cast(float_size); + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "FPC codec is not applicable for floats of size less than 4 bytes. Given type {}", + column_type.getName()); +} + +UInt8 encodeEndianness(std::endian endian) +{ + switch (endian) + { + case std::endian::little: + return 0; + case std::endian::big: + return 1; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +std::endian decodeEndianness(UInt8 endian) { + switch (endian) + { + case 0: + return std::endian::little; + case 1: + return std::endian::big; + } + throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); +} + +} + +void registerCodecFPC(CompressionCodecFactory & factory) +{ + auto method_code = static_cast(CompressionMethodByte::FPC); + auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr + { + if (!column_type) + { + throw Exception("FPC codec must have associated column", ErrorCodes::BAD_ARGUMENTS); + } + UInt8 level{0}; + if (arguments && !arguments->children.empty()) + { + if (arguments->children.size() > 1) + { + throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, + "FPC codec must have 1 parameter, given {}", arguments->children.size()); + } + + const auto * literal = arguments->children.front()->as(); + if (!literal) + { + throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + } + + level = literal->value.safeGet(); + } + return std::make_shared(getFloatBytesSize(*column_type), level); + }; + factory.registerCompressionCodecWithType("FPC", method_code, codec_builder); +} + +namespace +{ + +template requires (sizeof(TUint) >= 4) +class DfcmPredictor { +public: + explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash] + prev_value; + } + + void add(TUint value) noexcept + { + table[hash] = value - prev_value; + recalculateHash(); + prev_value = value; + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 2) ^ static_cast(value >> 40)) & (table.size() - 1); + } + else + { + hash = ((hash << 4) ^ static_cast(value >> 23)) & (table.size() - 1); + } + } + + std::vector table; + TUint prev_value{0}; + std::size_t hash{0}; +}; + +template requires (sizeof(TUint) >= 4) +class FcmPredictor { +public: + explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0} + { + } + + [[nodiscard]] + TUint predict() const noexcept + { + return table[hash]; + } + + void add(TUint value) noexcept + { + table[hash] = value; + recalculateHash(); + } + +private: + void recalculateHash() noexcept + { + auto value = table[hash]; + if constexpr (sizeof(TUint) >= 8) + { + hash = ((hash << 6) ^ static_cast(value >> 48)) & (table.size() - 1); + } + else + { + hash = ((hash << 1) ^ static_cast(value >> 22)) & (table.size() - 1); + } + } + + std::vector table; + std::size_t hash{0}; +}; + +template + requires (Endian == std::endian::little || Endian == std::endian::big) +class FPCOperation +{ + static constexpr std::size_t CHUNK_SIZE{64}; + + static constexpr auto VALUE_SIZE = sizeof(TUint); + static constexpr std::byte DFCM_BIT_1{1u << 7}; + static constexpr std::byte DFCM_BIT_2{1u << 3}; + static constexpr unsigned MAX_COMPRESSED_SIZE{0b111u}; + +public: + explicit FPCOperation(std::span destination, UInt8 compression_level) + : dfcm_predictor(1 << compression_level), fcm_predictor(1 << compression_level), chunk{}, result{destination} + { + } + + std::size_t encode(std::span data) && + { + auto initial_size = result.size(); + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < data.size(); i += chunk_view.size_bytes()) + { + auto written_values = importChunk(data.subspan(i), chunk_view); + encodeChunk(chunk_view.subspan(0, written_values)); + } + + return initial_size - result.size(); + } + + void decode(std::span values, std::size_t decoded_size) && + { + std::size_t read_bytes{0}; + + std::span chunk_view(chunk); + for (std::size_t i = 0; i < decoded_size; i += chunk_view.size_bytes()) + { + if (i + chunk_view.size_bytes() > decoded_size) + chunk_view = chunk_view.first(ceilBytesToEvenValues(decoded_size - i)); + read_bytes += decodeChunk(values.subspan(read_bytes), chunk_view); + exportChunk(chunk_view); + } + } + +private: + static std::size_t ceilBytesToEvenValues(std::size_t bytes_count) + { + auto values_count = (bytes_count + VALUE_SIZE - 1) / VALUE_SIZE; + return values_count % 2 == 0 ? values_count : values_count + 1; + } + + std::size_t importChunk(std::span values, std::span chnk) + { + if (auto chunk_view = std::as_writable_bytes(chnk); chunk_view.size() <= values.size()) + { + std::memcpy(chunk_view.data(), values.data(), chunk_view.size()); + return chunk_view.size() / VALUE_SIZE; + } + else + { + std::memset(chunk_view.data(), 0, chunk_view.size()); + std::memcpy(chunk_view.data(), values.data(), values.size()); + return ceilBytesToEvenValues(values.size()); + } + } + + void exportChunk(std::span chnk) + { + auto chunk_view = std::as_bytes(chnk).first(std::min(result.size(), chnk.size_bytes())); + std::memcpy(result.data(), chunk_view.data(), chunk_view.size()); + result = result.subspan(chunk_view.size()); + } + + void encodeChunk(std::span seq) + { + for (std::size_t i = 0; i < seq.size(); i += 2) + { + encodePair(seq[i], seq[i + 1]); + } + } + + struct CompressedValue + { + TUint value; + unsigned compressed_size; + bool is_dfcm_predictor; + }; + + unsigned encodeCompressedSize(int compressed) + { + if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + { + if (compressed >= 4) + --compressed; + } + return std::min(static_cast(compressed), MAX_COMPRESSED_SIZE); + } + + unsigned decodeCompressedSize(unsigned encoded_size) + { + if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + { + if (encoded_size > 3) + ++encoded_size; + } + return encoded_size; + } + + CompressedValue compressValue(TUint value) noexcept + { + TUint compressed_dfcm = dfcm_predictor.predict() ^ value; + TUint compressed_fcm = fcm_predictor.predict() ^ value; + dfcm_predictor.add(value); + fcm_predictor.add(value); + auto zeroes_dfcm = std::countl_zero(compressed_dfcm); + auto zeroes_fcm = std::countl_zero(compressed_fcm); + if (zeroes_dfcm > zeroes_fcm) + return {compressed_dfcm, encodeCompressedSize(zeroes_dfcm / CHAR_BIT), true}; + return {compressed_fcm, encodeCompressedSize(zeroes_fcm / CHAR_BIT), false}; + } + + void encodePair(TUint first, TUint second) + { + auto [value1, compressed_size1, is_dfcm_predictor1] = compressValue(first); + auto [value2, compressed_size2, is_dfcm_predictor2] = compressValue(second); + std::byte header{0x0}; + if (is_dfcm_predictor1) + header |= DFCM_BIT_1; + if (is_dfcm_predictor2) + header |= DFCM_BIT_2; + header |= static_cast((compressed_size1 << 4) | compressed_size2); + result.front() = header; + + compressed_size1 = decodeCompressedSize(compressed_size1); + compressed_size2 = decodeCompressedSize(compressed_size2); + auto tail_size1 = VALUE_SIZE - compressed_size1; + auto tail_size2 = VALUE_SIZE - compressed_size2; + + std::memcpy(result.data() + 1, valueTail(value1, compressed_size1), tail_size1); + std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, compressed_size2), tail_size2); + result = result.subspan(1 + tail_size1 + tail_size2); + } + + std::size_t decodeChunk(std::span values, std::span seq) + { + std::size_t read_bytes{0}; + for (std::size_t i = 0; i < seq.size(); i += 2) + { + read_bytes += decodePair(values.subspan(read_bytes), seq[i], seq[i + 1]); + } + return read_bytes; + } + + TUint decompressValue(TUint value, bool isDfcmPredictor) + { + TUint decompressed; + if (isDfcmPredictor) + { + decompressed = dfcm_predictor.predict() ^ value; + } + else + { + decompressed = fcm_predictor.predict() ^ value; + } + dfcm_predictor.add(decompressed); + fcm_predictor.add(decompressed); + return decompressed; + } + + std::size_t decodePair(std::span bytes, TUint& first, TUint& second) + { + if (bytes.empty()) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + auto compressed_size1 = decodeCompressedSize(static_cast(bytes.front() >> 4) & MAX_COMPRESSED_SIZE); + auto compressed_size2 = decodeCompressedSize(static_cast(bytes.front()) & MAX_COMPRESSED_SIZE); + + auto tail_size1 = VALUE_SIZE - compressed_size1; + auto tail_size2 = VALUE_SIZE - compressed_size2; + + if (bytes.size() < 1 + tail_size1 + tail_size2) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); + + TUint value1{0}; + TUint value2{0}; + + std::memcpy(valueTail(value1, compressed_size1), bytes.data() + 1, tail_size1); + std::memcpy(valueTail(value2, compressed_size2), bytes.data() + 1 + tail_size1, tail_size2); + + auto is_dfcm_predictor1 = static_cast(bytes.front() & DFCM_BIT_1); + auto is_dfcm_predictor2 = static_cast(bytes.front() & DFCM_BIT_2); + first = decompressValue(value1, is_dfcm_predictor1 != 0); + second = decompressValue(value2, is_dfcm_predictor2 != 0); + + return 1 + tail_size1 + tail_size2; + } + + static void* valueTail(TUint& value, unsigned compressed_size) + { + if constexpr (Endian == std::endian::little) + { + return &value; + } + else + { + return reinterpret_cast(&value) + compressed_size; + } + } + + DfcmPredictor dfcm_predictor; + FcmPredictor fcm_predictor; + std::array chunk{}; + std::span result{}; +}; + +} + +UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + dest[0] = static_cast(float_width); + dest[1] = static_cast(level); + dest[2] = static_cast(encodeEndianness(std::endian::native)); + + auto destination = std::as_writable_bytes(std::span(dest, source_size).subspan(HEADER_SIZE)); + auto src = std::as_bytes(std::span(source, source_size)); + switch (float_width) + { + case sizeof(Float64): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + case sizeof(Float32): + return HEADER_SIZE + FPCOperation(destination, level).encode(src); + default: + break; + } + throw Exception("Cannot compress. File has incorrect float width", ErrorCodes::CANNOT_COMPRESS); +} + +void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const +{ + if (source_size < HEADER_SIZE) + throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); + + auto compressed_data = std::span(source, source_size); + if (static_cast(compressed_data[0]) != float_width) + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); + if (static_cast(compressed_data[1]) != level) + throw Exception("Cannot decompress. File has incorrect compression level", ErrorCodes::CANNOT_DECOMPRESS); + if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) + throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); + + auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); + auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); + switch (float_width) + { + case sizeof(Float64): + FPCOperation(destination, level).decode(src, uncompressed_size); + break; + case sizeof(Float32): + FPCOperation(destination, level).decode(src, uncompressed_size); + break; + default: + break; + } +} + +} diff --git a/src/Compression/CompressionFactory.cpp b/src/Compression/CompressionFactory.cpp index abf5e38a8c3..b8a1c5877a4 100644 --- a/src/Compression/CompressionFactory.cpp +++ b/src/Compression/CompressionFactory.cpp @@ -177,6 +177,7 @@ void registerCodecT64(CompressionCodecFactory & factory); void registerCodecDoubleDelta(CompressionCodecFactory & factory); void registerCodecGorilla(CompressionCodecFactory & factory); void registerCodecEncrypted(CompressionCodecFactory & factory); +void registerCodecFPC(CompressionCodecFactory & factory); #endif @@ -194,6 +195,7 @@ CompressionCodecFactory::CompressionCodecFactory() registerCodecDoubleDelta(*this); registerCodecGorilla(*this); registerCodecEncrypted(*this); + registerCodecFPC(*this); #endif default_codec = get("LZ4", {}); diff --git a/src/Compression/CompressionInfo.h b/src/Compression/CompressionInfo.h index bbe8315f3ea..839fb68e8c3 100644 --- a/src/Compression/CompressionInfo.h +++ b/src/Compression/CompressionInfo.h @@ -44,7 +44,8 @@ enum class CompressionMethodByte : uint8_t DoubleDelta = 0x94, Gorilla = 0x95, AES_128_GCM_SIV = 0x96, - AES_256_GCM_SIV = 0x97 + AES_256_GCM_SIV = 0x97, + FPC = 0x98 }; } From adf888811cf036612ceb33ac245a47eb6a24e689 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 04:53:07 +0300 Subject: [PATCH 2/8] fixed max size computation --- src/Compression/CompressionCodecFPC.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 3c7aac794c0..0e50d62893c 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -72,7 +72,7 @@ UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) c if (float_count % 2 != 0) { ++float_count; } - return HEADER_SIZE + (float_count + float_count / 2) * float_width; + return HEADER_SIZE + float_count * float_width + float_count / 2; } namespace @@ -453,7 +453,8 @@ UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_si dest[1] = static_cast(level); dest[2] = static_cast(encodeEndianness(std::endian::native)); - auto destination = std::as_writable_bytes(std::span(dest, source_size).subspan(HEADER_SIZE)); + auto dest_size = getMaxCompressedDataSize(source_size); + auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE)); auto src = std::as_bytes(std::span(source, source_size)); switch (float_width) { From cd3e28e3b1e4c8881c642c29d3fcf59bbf25e621 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 06:53:48 +0300 Subject: [PATCH 3/8] fixed decoding parameters --- src/Compression/CompressionCodecFPC.cpp | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 0e50d62893c..d6b96f8a2f2 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -124,11 +124,11 @@ void registerCodecFPC(CompressionCodecFactory & factory) auto method_code = static_cast(CompressionMethodByte::FPC); auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr { - if (!column_type) - { - throw Exception("FPC codec must have associated column", ErrorCodes::BAD_ARGUMENTS); - } - UInt8 level{0}; + UInt8 float_width{0}; + if (column_type != nullptr) + float_width = getFloatBytesSize(*column_type); + + UInt8 level{12}; if (arguments && !arguments->children.empty()) { if (arguments->children.size() > 1) @@ -139,13 +139,11 @@ void registerCodecFPC(CompressionCodecFactory & factory) const auto * literal = arguments->children.front()->as(); if (!literal) - { throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); - } level = literal->value.safeGet(); } - return std::make_shared(getFloatBytesSize(*column_type), level); + return std::make_shared(float_width, level); }; factory.registerCompressionCodecWithType("FPC", method_code, codec_builder); } @@ -474,26 +472,28 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); auto compressed_data = std::span(source, source_size); - if (static_cast(compressed_data[0]) != float_width) - throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); - if (static_cast(compressed_data[1]) != level) - throw Exception("Cannot decompress. File has incorrect compression level", ErrorCodes::CANNOT_DECOMPRESS); if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); + auto compressed_float_width = static_cast(compressed_data[0]); + auto compressed_level = static_cast(compressed_data[1]); + if (compressed_level == 0) + throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); + auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); - switch (float_width) + switch (compressed_float_width) { case sizeof(Float64): - FPCOperation(destination, level).decode(src, uncompressed_size); + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; case sizeof(Float32): - FPCOperation(destination, level).decode(src, uncompressed_size); + FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; default: break; } + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } } From 1bfeb982af9bffea3c42ff2f87daa99ea016f83e Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 06:58:34 +0300 Subject: [PATCH 4/8] fixed decoding parameters --- src/Compression/CompressionCodecFPC.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index d6b96f8a2f2..269a935da4b 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -491,9 +491,8 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si FPCOperation(destination, compressed_level).decode(src, uncompressed_size); break; default: - break; + throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } - throw Exception("Cannot decompress. File has incorrect float width", ErrorCodes::CANNOT_DECOMPRESS); } } From 4d41121c096918493b4f0ac05dbab4546b8c9331 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 10:58:00 +0300 Subject: [PATCH 5/8] added test query --- .../02313_test_fpc_codec.reference | 2 + .../0_stateless/02313_test_fpc_codec.sql | 61 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tests/queries/0_stateless/02313_test_fpc_codec.reference create mode 100644 tests/queries/0_stateless/02313_test_fpc_codec.sql diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.reference b/tests/queries/0_stateless/02313_test_fpc_codec.reference new file mode 100644 index 00000000000..5e871ea0329 --- /dev/null +++ b/tests/queries/0_stateless/02313_test_fpc_codec.reference @@ -0,0 +1,2 @@ +F64 +F32 diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.sql b/tests/queries/0_stateless/02313_test_fpc_codec.sql new file mode 100644 index 00000000000..e077e59b07b --- /dev/null +++ b/tests/queries/0_stateless/02313_test_fpc_codec.sql @@ -0,0 +1,61 @@ +DROP TABLE IF EXISTS codecTest; + +CREATE TABLE codecTest ( + key UInt64, + name String, + ref_valueF64 Float64, + ref_valueF32 Float32, + valueF64 Float64 CODEC(FPC), + valueF32 Float32 CODEC(FPC) +) Engine = MergeTree ORDER BY key; + +-- best case - same value +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100; + +-- good case - values that grow insignificantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100; + +-- bad case - values differ significantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100; + +-- worst case - almost like a random values +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100; + + +-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here. + +-- referencing previous row key, value, and case name to simplify debugging. +SELECT 'F64'; +SELECT + c1.key, c1.name, + c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64, + 'prev:', + c2.key, c2.ref_valueF64 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF64 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + + +SELECT 'F32'; +SELECT + c1.key, c1.name, + c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32, + 'prev:', + c2.key, c2.ref_valueF32 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF32 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + +DROP TABLE IF EXISTS codecTest; From 821100f145cf75c8ef3a3c30d16c4bafcc05378e Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 11:09:36 +0300 Subject: [PATCH 6/8] code style fixes --- src/Compression/CompressionCodecFPC.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 269a935da4b..0f68f512ad8 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -69,9 +69,8 @@ CompressionCodecFPC::CompressionCodecFPC(UInt8 float_size, UInt8 compression_lev UInt32 CompressionCodecFPC::getMaxCompressedDataSize(UInt32 uncompressed_size) const { auto float_count = (uncompressed_size + float_width - 1) / float_width; - if (float_count % 2 != 0) { + if (float_count % 2 != 0) ++float_count; - } return HEADER_SIZE + float_count * float_width + float_count / 2; } @@ -106,7 +105,8 @@ UInt8 encodeEndianness(std::endian endian) throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); } -std::endian decodeEndianness(UInt8 endian) { +std::endian decodeEndianness(UInt8 endian) +{ switch (endian) { case 0: @@ -152,7 +152,8 @@ namespace { template requires (sizeof(TUint) >= 4) -class DfcmPredictor { +class DfcmPredictor +{ public: explicit DfcmPredictor(std::size_t table_size): table(table_size, 0), prev_value{0}, hash{0} { @@ -191,7 +192,8 @@ private: }; template requires (sizeof(TUint) >= 4) -class FcmPredictor { +class FcmPredictor +{ public: explicit FcmPredictor(std::size_t table_size): table(table_size, 0), hash{0} { From 7e69779575b712aae530c71f82f9e33f10e48d76 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Thu, 26 May 2022 22:32:56 +0300 Subject: [PATCH 7/8] added fpc codec to float perftest --- src/Compression/CompressionCodecFPC.cpp | 2 + tests/performance/codecs_float_insert.xml | 1 + tests/performance/codecs_float_select.xml | 1 + .../02313_test_fpc_codec.reference | 2 + .../0_stateless/02313_test_fpc_codec.sql | 60 +++++++++++++++++++ 5 files changed, 66 insertions(+) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index 0f68f512ad8..f3106204e01 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -142,6 +142,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) throw Exception("FPC codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); level = literal->value.safeGet(); + if (level == 0) + throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER); } return std::make_shared(float_width, level); }; diff --git a/tests/performance/codecs_float_insert.xml b/tests/performance/codecs_float_insert.xml index b31e0eafdd7..64325d30189 100644 --- a/tests/performance/codecs_float_insert.xml +++ b/tests/performance/codecs_float_insert.xml @@ -12,6 +12,7 @@ ZSTD DoubleDelta Gorilla + FPC diff --git a/tests/performance/codecs_float_select.xml b/tests/performance/codecs_float_select.xml index 82489daf524..4743a756ac3 100644 --- a/tests/performance/codecs_float_select.xml +++ b/tests/performance/codecs_float_select.xml @@ -12,6 +12,7 @@ ZSTD DoubleDelta Gorilla + FPC diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.reference b/tests/queries/0_stateless/02313_test_fpc_codec.reference index 5e871ea0329..23c75ed1ac0 100644 --- a/tests/queries/0_stateless/02313_test_fpc_codec.reference +++ b/tests/queries/0_stateless/02313_test_fpc_codec.reference @@ -1,2 +1,4 @@ F64 F32 +F64 +F32 diff --git a/tests/queries/0_stateless/02313_test_fpc_codec.sql b/tests/queries/0_stateless/02313_test_fpc_codec.sql index e077e59b07b..3b1127350f0 100644 --- a/tests/queries/0_stateless/02313_test_fpc_codec.sql +++ b/tests/queries/0_stateless/02313_test_fpc_codec.sql @@ -44,6 +44,66 @@ AND LIMIT 10; +SELECT 'F32'; +SELECT + c1.key, c1.name, + c1.ref_valueF32, c1.valueF32, c1.ref_valueF32 - c1.valueF32 AS dF32, + 'prev:', + c2.key, c2.ref_valueF32 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF32 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + +DROP TABLE IF EXISTS codecTest; + +CREATE TABLE codecTest ( + key UInt64, + name String, + ref_valueF64 Float64, + ref_valueF32 Float32, + valueF64 Float64 CODEC(FPC(4)), + valueF32 Float32 CODEC(FPC(4)) +) Engine = MergeTree ORDER BY key; + +-- best case - same value +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'e()', e() AS v, v, v, v FROM system.numbers LIMIT 1, 100; + +-- good case - values that grow insignificantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'log2(n)', log2(n) AS v, v, v, v FROM system.numbers LIMIT 101, 100; + +-- bad case - values differ significantly +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'n*sqrt(n)', n*sqrt(n) AS v, v, v, v FROM system.numbers LIMIT 201, 100; + +-- worst case - almost like a random values +INSERT INTO codecTest (key, name, ref_valueF64, valueF64, ref_valueF32, valueF32) + SELECT number AS n, 'sin(n*n*n)*n', sin(n * n * n * n* n) AS v, v, v, v FROM system.numbers LIMIT 301, 100; + + +-- These floating-point values are expected to be BINARY equal, so comparing by-value is Ok here. + +-- referencing previous row key, value, and case name to simplify debugging. +SELECT 'F64'; +SELECT + c1.key, c1.name, + c1.ref_valueF64, c1.valueF64, c1.ref_valueF64 - c1.valueF64 AS dF64, + 'prev:', + c2.key, c2.ref_valueF64 +FROM + codecTest as c1, codecTest as c2 +WHERE + dF64 != 0 +AND + c2.key = c1.key - 1 +LIMIT 10; + + SELECT 'F32'; SELECT c1.key, c1.name, From d5064dd5b5480f93966f847e4752d620eebc6656 Mon Sep 17 00:00:00 2001 From: koloshmet Date: Wed, 8 Jun 2022 02:17:34 +0300 Subject: [PATCH 8/8] so many improvements --- src/Compression/CompressionCodecFPC.cpp | 129 +++++++++++++----------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp index f3106204e01..3b66060b6fc 100644 --- a/src/Compression/CompressionCodecFPC.cpp +++ b/src/Compression/CompressionCodecFPC.cpp @@ -17,12 +17,15 @@ namespace DB class CompressionCodecFPC : public ICompressionCodec { public: - explicit CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); + CompressionCodecFPC(UInt8 float_size, UInt8 compression_level); uint8_t getMethodByte() const override; void updateHash(SipHash & hash) const override; + static constexpr UInt8 MAX_COMPRESSION_LEVEL{28}; + static constexpr UInt8 DEFAULT_COMPRESSION_LEVEL{12}; + protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; @@ -33,11 +36,11 @@ protected: bool isCompression() const override { return true; } bool isGenericCompression() const override { return false; } +private: static constexpr UInt32 HEADER_SIZE{3}; -private: - UInt8 float_width; - UInt8 level; + UInt8 float_width; // size of uncompressed float in bytes + UInt8 level; // compression level, 2^level * float_width is the size of predictors table in bytes }; @@ -93,21 +96,21 @@ UInt8 getFloatBytesSize(const IDataType & column_type) column_type.getName()); } -UInt8 encodeEndianness(std::endian endian) +std::byte encodeEndianness(std::endian endian) { switch (endian) { case std::endian::little: - return 0; + return std::byte{0}; case std::endian::big: - return 1; + return std::byte{1}; } throw Exception("Unsupported endianness", ErrorCodes::BAD_ARGUMENTS); } -std::endian decodeEndianness(UInt8 endian) +std::endian decodeEndianness(std::byte endian) { - switch (endian) + switch (std::to_integer(endian)) { case 0: return std::endian::little; @@ -128,7 +131,7 @@ void registerCodecFPC(CompressionCodecFactory & factory) if (column_type != nullptr) float_width = getFloatBytesSize(*column_type); - UInt8 level{12}; + UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL; if (arguments && !arguments->children.empty()) { if (arguments->children.size() > 1) @@ -144,6 +147,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) level = literal->value.safeGet(); if (level == 0) throw Exception("FPC codec level must be at least 1", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + if (level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL) + throw Exception("FPC codec level must be at most 28", ErrorCodes::ILLEGAL_CODEC_PARAMETER); } return std::make_shared(float_width, level); }; @@ -153,7 +158,8 @@ void registerCodecFPC(CompressionCodecFactory & factory) namespace { -template requires (sizeof(TUint) >= 4) +template + requires (sizeof(TUint) >= 4) class DfcmPredictor { public: @@ -189,11 +195,12 @@ private: } std::vector table; - TUint prev_value{0}; - std::size_t hash{0}; + TUint prev_value; + std::size_t hash; }; -template requires (sizeof(TUint) >= 4) +template + requires (sizeof(TUint) >= 4) class FcmPredictor { public: @@ -228,7 +235,7 @@ private: } std::vector table; - std::size_t hash{0}; + std::size_t hash; }; template @@ -238,13 +245,15 @@ class FPCOperation static constexpr std::size_t CHUNK_SIZE{64}; static constexpr auto VALUE_SIZE = sizeof(TUint); - static constexpr std::byte DFCM_BIT_1{1u << 7}; - static constexpr std::byte DFCM_BIT_2{1u << 3}; - static constexpr unsigned MAX_COMPRESSED_SIZE{0b111u}; + static constexpr std::byte FCM_BIT{0}; + static constexpr std::byte DFCM_BIT{1u << 3}; + static constexpr auto DFCM_BIT_1 = DFCM_BIT << 4; + static constexpr auto DFCM_BIT_2 = DFCM_BIT; + static constexpr unsigned MAX_ZERO_BYTE_COUNT{0b111u}; public: - explicit FPCOperation(std::span destination, UInt8 compression_level) - : dfcm_predictor(1 << compression_level), fcm_predictor(1 << compression_level), chunk{}, result{destination} + FPCOperation(std::span destination, UInt8 compression_level) + : dfcm_predictor(1u << compression_level), fcm_predictor(1u << compression_level), chunk{}, result{destination} { } @@ -317,22 +326,22 @@ private: { TUint value; unsigned compressed_size; - bool is_dfcm_predictor; + std::byte predictor; }; - unsigned encodeCompressedSize(int compressed) + unsigned encodeCompressedZeroByteCount(int compressed) { - if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) { if (compressed >= 4) --compressed; } - return std::min(static_cast(compressed), MAX_COMPRESSED_SIZE); + return std::min(static_cast(compressed), MAX_ZERO_BYTE_COUNT); } - unsigned decodeCompressedSize(unsigned encoded_size) + unsigned decodeCompressedZeroByteCount(unsigned encoded_size) { - if constexpr (VALUE_SIZE > MAX_COMPRESSED_SIZE) + if constexpr (VALUE_SIZE == MAX_ZERO_BYTE_COUNT + 1) { if (encoded_size > 3) ++encoded_size; @@ -342,6 +351,8 @@ private: CompressedValue compressValue(TUint value) noexcept { + static constexpr auto BITS_PER_BYTE = std::numeric_limits::digits; + TUint compressed_dfcm = dfcm_predictor.predict() ^ value; TUint compressed_fcm = fcm_predictor.predict() ^ value; dfcm_predictor.add(value); @@ -349,29 +360,26 @@ private: auto zeroes_dfcm = std::countl_zero(compressed_dfcm); auto zeroes_fcm = std::countl_zero(compressed_fcm); if (zeroes_dfcm > zeroes_fcm) - return {compressed_dfcm, encodeCompressedSize(zeroes_dfcm / CHAR_BIT), true}; - return {compressed_fcm, encodeCompressedSize(zeroes_fcm / CHAR_BIT), false}; + return {compressed_dfcm, encodeCompressedZeroByteCount(zeroes_dfcm / BITS_PER_BYTE), DFCM_BIT}; + return {compressed_fcm, encodeCompressedZeroByteCount(zeroes_fcm / BITS_PER_BYTE), FCM_BIT}; } void encodePair(TUint first, TUint second) { - auto [value1, compressed_size1, is_dfcm_predictor1] = compressValue(first); - auto [value2, compressed_size2, is_dfcm_predictor2] = compressValue(second); + auto [value1, zero_byte_count1, predictor1] = compressValue(first); + auto [value2, zero_byte_count2, predictor2] = compressValue(second); std::byte header{0x0}; - if (is_dfcm_predictor1) - header |= DFCM_BIT_1; - if (is_dfcm_predictor2) - header |= DFCM_BIT_2; - header |= static_cast((compressed_size1 << 4) | compressed_size2); + header |= (predictor1 << 4) | predictor2; + header |= static_cast((zero_byte_count1 << 4) | zero_byte_count2); result.front() = header; - compressed_size1 = decodeCompressedSize(compressed_size1); - compressed_size2 = decodeCompressedSize(compressed_size2); - auto tail_size1 = VALUE_SIZE - compressed_size1; - auto tail_size2 = VALUE_SIZE - compressed_size2; + zero_byte_count1 = decodeCompressedZeroByteCount(zero_byte_count1); + zero_byte_count2 = decodeCompressedZeroByteCount(zero_byte_count2); + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; - std::memcpy(result.data() + 1, valueTail(value1, compressed_size1), tail_size1); - std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, compressed_size2), tail_size2); + std::memcpy(result.data() + 1, valueTail(value1, zero_byte_count1), tail_size1); + std::memcpy(result.data() + 1 + tail_size1, valueTail(value2, zero_byte_count2), tail_size2); result = result.subspan(1 + tail_size1 + tail_size2); } @@ -406,11 +414,13 @@ private: if (bytes.empty()) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); - auto compressed_size1 = decodeCompressedSize(static_cast(bytes.front() >> 4) & MAX_COMPRESSED_SIZE); - auto compressed_size2 = decodeCompressedSize(static_cast(bytes.front()) & MAX_COMPRESSED_SIZE); + auto zero_byte_count1 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front() >> 4) & MAX_ZERO_BYTE_COUNT); + auto zero_byte_count2 = decodeCompressedZeroByteCount( + std::to_integer(bytes.front()) & MAX_ZERO_BYTE_COUNT); - auto tail_size1 = VALUE_SIZE - compressed_size1; - auto tail_size2 = VALUE_SIZE - compressed_size2; + auto tail_size1 = VALUE_SIZE - zero_byte_count1; + auto tail_size2 = VALUE_SIZE - zero_byte_count2; if (bytes.size() < 1 + tail_size1 + tail_size2) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence"); @@ -418,13 +428,13 @@ private: TUint value1{0}; TUint value2{0}; - std::memcpy(valueTail(value1, compressed_size1), bytes.data() + 1, tail_size1); - std::memcpy(valueTail(value2, compressed_size2), bytes.data() + 1 + tail_size1, tail_size2); + std::memcpy(valueTail(value1, zero_byte_count1), bytes.data() + 1, tail_size1); + std::memcpy(valueTail(value2, zero_byte_count2), bytes.data() + 1 + tail_size1, tail_size2); - auto is_dfcm_predictor1 = static_cast(bytes.front() & DFCM_BIT_1); - auto is_dfcm_predictor2 = static_cast(bytes.front() & DFCM_BIT_2); - first = decompressValue(value1, is_dfcm_predictor1 != 0); - second = decompressValue(value2, is_dfcm_predictor2 != 0); + auto is_dfcm_predictor1 = std::to_integer(bytes.front() & DFCM_BIT_1) != 0; + auto is_dfcm_predictor2 = std::to_integer(bytes.front() & DFCM_BIT_2) != 0; + first = decompressValue(value1, is_dfcm_predictor1); + second = decompressValue(value2, is_dfcm_predictor2); return 1 + tail_size1 + tail_size2; } @@ -453,7 +463,7 @@ UInt32 CompressionCodecFPC::doCompressData(const char * source, UInt32 source_si { dest[0] = static_cast(float_width); dest[1] = static_cast(level); - dest[2] = static_cast(encodeEndianness(std::endian::native)); + dest[2] = std::to_integer(encodeEndianness(std::endian::native)); auto dest_size = getMaxCompressedDataSize(source_size); auto destination = std::as_writable_bytes(std::span(dest, dest_size).subspan(HEADER_SIZE)); @@ -475,17 +485,16 @@ void CompressionCodecFPC::doDecompressData(const char * source, UInt32 source_si if (source_size < HEADER_SIZE) throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS); - auto compressed_data = std::span(source, source_size); - if (decodeEndianness(static_cast(compressed_data[2])) != std::endian::native) + auto compressed_data = std::as_bytes(std::span(source, source_size)); + auto compressed_float_width = std::to_integer(compressed_data[0]); + auto compressed_level = std::to_integer(compressed_data[1]); + if (compressed_level == 0 || compressed_level > MAX_COMPRESSION_LEVEL) + throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); + if (decodeEndianness(compressed_data[2]) != std::endian::native) throw Exception("Cannot decompress. File has incorrect endianness", ErrorCodes::CANNOT_DECOMPRESS); - auto compressed_float_width = static_cast(compressed_data[0]); - auto compressed_level = static_cast(compressed_data[1]); - if (compressed_level == 0) - throw Exception("Cannot decompress. File has incorrect level", ErrorCodes::CANNOT_DECOMPRESS); - auto destination = std::as_writable_bytes(std::span(dest, uncompressed_size)); - auto src = std::as_bytes(compressed_data.subspan(HEADER_SIZE)); + auto src = compressed_data.subspan(HEADER_SIZE); switch (compressed_float_width) { case sizeof(Float64):