add zstd long range option (#17184)

* add zstd long compression option

* tests: add zstd long read-write test

Co-authored-by: Joris Giovannangeli <joris.giovannangeli@ahrefs.com>
Co-authored-by: ip <igor@ahrefs.com>
This commit is contained in:
ygrek 2021-01-13 08:22:59 -05:00 committed by GitHub
parent 038f52d08b
commit 8f2a830d83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 10 deletions

View File

@ -38,7 +38,15 @@ UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size)
UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level);
ZSTD_CCtx * cctx = ZSTD_createCCtx();
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, level);
if (enable_long_range)
{
ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1);
ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, window_log); // NB zero window_log means "use default" for libzstd
}
size_t compressed_size = ZSTD_compress2(cctx, dest, ZSTD_compressBound(source_size), source, source_size);
ZSTD_freeCCtx(cctx);
if (ZSTD_isError(compressed_size))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(compressed_size)), ErrorCodes::CANNOT_COMPRESS);
@ -55,8 +63,13 @@ void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_s
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_)
: level(level_)
CompressionCodecZSTD::CompressionCodecZSTD(int level_, int window_log_) : level(level_), enable_long_range(true), window_log(window_log_)
{
setCodecDescription(
"ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level)), std::make_shared<ASTLiteral>(static_cast<UInt64>(window_log))});
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_) : level(level_), enable_long_range(false), window_log(0)
{
setCodecDescription("ZSTD", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
@ -64,13 +77,14 @@ CompressionCodecZSTD::CompressionCodecZSTD(int level_)
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr {
int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception("ZSTD codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
if (arguments->children.size() > 2)
throw Exception(
"ZSTD codec must have 1 or 2 parameters, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
@ -79,9 +93,32 @@ void registerCodecZSTD(CompressionCodecFactory & factory)
level = literal->value.safeGet<UInt64>();
if (level > ZSTD_maxCLevel())
throw Exception("ZSTD codec can't have level more that " + toString(ZSTD_maxCLevel()) + ", given " + toString(level), ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
throw Exception(
"ZSTD codec can't have level more than " + toString(ZSTD_maxCLevel()) + ", given " + toString(level),
ErrorCodes::ILLEGAL_CODEC_PARAMETER);
if (arguments->children.size() > 1)
{
const auto * window_literal = children[1]->as<ASTLiteral>();
if (!window_literal)
throw Exception("ZSTD codec second argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
const int window_log = window_literal->value.safeGet<UInt64>();
ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog);
if (ZSTD_isError(window_log_bounds.error))
throw Exception(
"ZSTD windowLog parameter is not supported " + std::string(ZSTD_getErrorName(window_log_bounds.error)),
ErrorCodes::ILLEGAL_CODEC_PARAMETER);
// 0 means "use default" for libzstd
if (window_log != 0 && (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound))
throw Exception(
"ZSTD codec can't have window log more than " + toString(window_log_bounds.upperBound) + " and lower than "
+ toString(window_log_bounds.lowerBound) + ", given " + toString(window_log),
ErrorCodes::ILLEGAL_CODEC_PARAMETER);
return std::make_shared<CompressionCodecZSTD>(level, window_log);
}
}
return std::make_shared<CompressionCodecZSTD>(level);
});
}

View File

@ -12,9 +12,12 @@ class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
@ -32,6 +35,8 @@ protected:
private:
const int level;
const bool enable_long_range;
const int window_log;
};
}

View File

@ -44,7 +44,9 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream(
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(disk_->writeFile(data_path_ + data_file_extension, max_compress_block_size_, WriteMode::Rewrite)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf),
plain_hashing(*plain_file),
compressed_buf(plain_hashing, compression_codec_, max_compress_block_size_),
compressed(compressed_buf),
marks_file(disk_->writeFile(marks_path_ + marks_file_extension, 4096, WriteMode::Rewrite)), marks(*marks_file)
{
}

View File

@ -0,0 +1,4 @@
10000
10000
10000
10000

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS zstd_1_00;
DROP TABLE IF EXISTS zstd_1_24;
DROP TABLE IF EXISTS zstd_9_00;
DROP TABLE IF EXISTS zstd_9_24;
DROP TABLE IF EXISTS words;
CREATE TABLE words(i Int, word String) ENGINE = Memory;
INSERT INTO words SELECT * FROM generateRandom('i Int, word String',1,10) LIMIT 10000;
CREATE TABLE zstd_1_00(n Int, b String CODEC(ZSTD(1))) ENGINE = MergeTree ORDER BY n;
CREATE TABLE zstd_1_24(n Int, b String CODEC(ZSTD(1,24))) ENGINE = MergeTree ORDER BY n;
CREATE TABLE zstd_9_00(n Int, b String CODEC(ZSTD(9))) ENGINE = MergeTree ORDER BY n;
CREATE TABLE zstd_9_24(n Int, b String CODEC(ZSTD(9,24))) ENGINE = MergeTree ORDER BY n;
INSERT INTO zstd_1_00 SELECT * FROM words;
INSERT INTO zstd_1_24 SELECT * FROM words;
INSERT INTO zstd_9_00 SELECT * FROM words;
INSERT INTO zstd_9_24 SELECT * FROM words;
SELECT COUNT(n) FROM zstd_1_00 LEFT JOIN words ON i == n WHERE b == word;
SELECT COUNT(n) FROM zstd_1_24 LEFT JOIN words ON i == n WHERE b == word;
SELECT COUNT(n) FROM zstd_9_00 LEFT JOIN words ON i == n WHERE b == word;
SELECT COUNT(n) FROM zstd_9_24 LEFT JOIN words ON i == n WHERE b == word;
DROP TABLE zstd_1_00;
DROP TABLE zstd_1_24;
DROP TABLE zstd_9_00;
DROP TABLE zstd_9_24;
DROP TABLE words;