From 8f2a830d8310388e205124f3e4564611c8ef8616 Mon Sep 17 00:00:00 2001 From: ygrek Date: Wed, 13 Jan 2021 08:22:59 -0500 Subject: [PATCH] add zstd long range option (#17184) * add zstd long compression option * tests: add zstd long read-write test Co-authored-by: Joris Giovannangeli Co-authored-by: ip --- src/Compression/CompressionCodecZSTD.cpp | 55 ++++++++++++++++--- src/Compression/CompressionCodecZSTD.h | 5 ++ .../MergeTreeDataPartWriterOnDisk.cpp | 4 +- .../01622_codec_zstd_long.reference | 4 ++ .../0_stateless/01622_codec_zstd_long.sql | 29 ++++++++++ 5 files changed, 87 insertions(+), 10 deletions(-) create mode 100644 tests/queries/0_stateless/01622_codec_zstd_long.reference create mode 100644 tests/queries/0_stateless/01622_codec_zstd_long.sql diff --git a/src/Compression/CompressionCodecZSTD.cpp b/src/Compression/CompressionCodecZSTD.cpp index f236c4bf460..378f74ce7cf 100644 --- a/src/Compression/CompressionCodecZSTD.cpp +++ b/src/Compression/CompressionCodecZSTD.cpp @@ -38,7 +38,15 @@ UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const { - size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level); + ZSTD_CCtx * cctx = ZSTD_createCCtx(); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, level); + if (enable_long_range) + { + ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, window_log); // NB zero window_log means "use default" for libzstd + } + size_t compressed_size = ZSTD_compress2(cctx, dest, ZSTD_compressBound(source_size), source, source_size); + ZSTD_freeCCtx(cctx); if (ZSTD_isError(compressed_size)) throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(compressed_size)), ErrorCodes::CANNOT_COMPRESS); @@ -55,8 +63,13 @@ void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_s throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS); } -CompressionCodecZSTD::CompressionCodecZSTD(int level_) - : level(level_) +CompressionCodecZSTD::CompressionCodecZSTD(int level_, int window_log_) : level(level_), enable_long_range(true), window_log(window_log_) +{ + setCodecDescription( + "ZSTD", {std::make_shared(static_cast(level)), std::make_shared(static_cast(window_log))}); +} + +CompressionCodecZSTD::CompressionCodecZSTD(int level_) : level(level_), enable_long_range(false), window_log(0) { setCodecDescription("ZSTD", {std::make_shared(static_cast(level))}); } @@ -64,13 +77,14 @@ CompressionCodecZSTD::CompressionCodecZSTD(int level_) void registerCodecZSTD(CompressionCodecFactory & factory) { UInt8 method_code = UInt8(CompressionMethodByte::ZSTD); - factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr - { + factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr { int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL; if (arguments && !arguments->children.empty()) { - if (arguments->children.size() > 1) - throw Exception("ZSTD codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE); + if (arguments->children.size() > 2) + throw Exception( + "ZSTD codec must have 1 or 2 parameters, given " + std::to_string(arguments->children.size()), + ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE); const auto children = arguments->children; const auto * literal = children[0]->as(); @@ -79,9 +93,32 @@ void registerCodecZSTD(CompressionCodecFactory & factory) level = literal->value.safeGet(); if (level > ZSTD_maxCLevel()) - throw Exception("ZSTD codec can't have level more that " + toString(ZSTD_maxCLevel()) + ", given " + toString(level), ErrorCodes::ILLEGAL_CODEC_PARAMETER); - } + throw Exception( + "ZSTD codec can't have level more than " + toString(ZSTD_maxCLevel()) + ", given " + toString(level), + ErrorCodes::ILLEGAL_CODEC_PARAMETER); + if (arguments->children.size() > 1) + { + const auto * window_literal = children[1]->as(); + if (!window_literal) + throw Exception("ZSTD codec second argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER); + const int window_log = window_literal->value.safeGet(); + + ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog); + if (ZSTD_isError(window_log_bounds.error)) + throw Exception( + "ZSTD windowLog parameter is not supported " + std::string(ZSTD_getErrorName(window_log_bounds.error)), + ErrorCodes::ILLEGAL_CODEC_PARAMETER); + // 0 means "use default" for libzstd + if (window_log != 0 && (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound)) + throw Exception( + "ZSTD codec can't have window log more than " + toString(window_log_bounds.upperBound) + " and lower than " + + toString(window_log_bounds.lowerBound) + ", given " + toString(window_log), + ErrorCodes::ILLEGAL_CODEC_PARAMETER); + + return std::make_shared(level, window_log); + } + } return std::make_shared(level); }); } diff --git a/src/Compression/CompressionCodecZSTD.h b/src/Compression/CompressionCodecZSTD.h index 903af6d6c1b..2d67ff9cecb 100644 --- a/src/Compression/CompressionCodecZSTD.h +++ b/src/Compression/CompressionCodecZSTD.h @@ -12,9 +12,12 @@ class CompressionCodecZSTD : public ICompressionCodec { public: static constexpr auto ZSTD_DEFAULT_LEVEL = 1; + static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24; CompressionCodecZSTD(int level_); + CompressionCodecZSTD(int level_, int window_log); + uint8_t getMethodByte() const override; UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; @@ -32,6 +35,8 @@ protected: private: const int level; + const bool enable_long_range; + const int window_log; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index fd3338c8a70..697f8c809b2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -44,7 +44,9 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( data_file_extension{data_file_extension_}, marks_file_extension{marks_file_extension_}, plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite)), - plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf), + plain_hashing(*plain_file), + compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_), + compressed(compressed_buf), marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file) { } diff --git a/tests/queries/0_stateless/01622_codec_zstd_long.reference b/tests/queries/0_stateless/01622_codec_zstd_long.reference new file mode 100644 index 00000000000..53bbf843449 --- /dev/null +++ b/tests/queries/0_stateless/01622_codec_zstd_long.reference @@ -0,0 +1,4 @@ +10000 +10000 +10000 +10000 diff --git a/tests/queries/0_stateless/01622_codec_zstd_long.sql b/tests/queries/0_stateless/01622_codec_zstd_long.sql new file mode 100644 index 00000000000..7045699d73f --- /dev/null +++ b/tests/queries/0_stateless/01622_codec_zstd_long.sql @@ -0,0 +1,29 @@ +DROP TABLE IF EXISTS zstd_1_00; +DROP TABLE IF EXISTS zstd_1_24; +DROP TABLE IF EXISTS zstd_9_00; +DROP TABLE IF EXISTS zstd_9_24; +DROP TABLE IF EXISTS words; + +CREATE TABLE words(i Int, word String) ENGINE = Memory; +INSERT INTO words SELECT * FROM generateRandom('i Int, word String',1,10) LIMIT 10000; + +CREATE TABLE zstd_1_00(n Int, b String CODEC(ZSTD(1))) ENGINE = MergeTree ORDER BY n; +CREATE TABLE zstd_1_24(n Int, b String CODEC(ZSTD(1,24))) ENGINE = MergeTree ORDER BY n; +CREATE TABLE zstd_9_00(n Int, b String CODEC(ZSTD(9))) ENGINE = MergeTree ORDER BY n; +CREATE TABLE zstd_9_24(n Int, b String CODEC(ZSTD(9,24))) ENGINE = MergeTree ORDER BY n; + +INSERT INTO zstd_1_00 SELECT * FROM words; +INSERT INTO zstd_1_24 SELECT * FROM words; +INSERT INTO zstd_9_00 SELECT * FROM words; +INSERT INTO zstd_9_24 SELECT * FROM words; + +SELECT COUNT(n) FROM zstd_1_00 LEFT JOIN words ON i == n WHERE b == word; +SELECT COUNT(n) FROM zstd_1_24 LEFT JOIN words ON i == n WHERE b == word; +SELECT COUNT(n) FROM zstd_9_00 LEFT JOIN words ON i == n WHERE b == word; +SELECT COUNT(n) FROM zstd_9_24 LEFT JOIN words ON i == n WHERE b == word; + +DROP TABLE zstd_1_00; +DROP TABLE zstd_1_24; +DROP TABLE zstd_9_00; +DROP TABLE zstd_9_24; +DROP TABLE words;