add experimental codecs flag, add integration test for experimental codecs

This commit is contained in:
fibersel 2021-05-06 14:57:22 +03:00
parent 26dc6517c0
commit cb53bbb7b0
24 changed files with 781 additions and 21 deletions

View File

@ -4,6 +4,7 @@
#include <boost/algorithm/string/join.hpp>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>
@ -19,6 +20,8 @@
#include <Common/TerminalSize.h>
#include <Core/Defines.h>
#include <sys/times.h>
#include <sys/time.h>
namespace DB
{
@ -77,6 +80,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", po::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("lizard", "use Lizard instread of LZ4")
("lzsse2", "use lzsse2 instread of LZ4")
("lzsse4", "use lzsse4 instread of LZ4")
("lzsse8", "use lzsse8 instread of LZ4")
("density", "use Density instread of LZ4")
("param", po::value<std::string>(), "extra params for compresion algorithm")
("codec", po::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", po::value<int>(), "compression level for codecs specified via flags")
("none", "use no compression instead of LZ4")
@ -103,6 +112,11 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool decompress = options.count("decompress");
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool use_lizard = options.count("lizard");
bool use_lzsse2 = options.count("lzsse2");
bool use_lzsse4 = options.count("lzsse4");
bool use_lzsse8 = options.count("lzsse8");
bool use_density = options.count("density");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
@ -110,6 +124,10 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
if (options.count("codec"))
codecs = options["codec"].as<std::vector<std::string>>();
std::optional<std::string> param;
if (options.count("param"))
param = options["param"].as<std::string>();
if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
throw Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", ErrorCodes::BAD_ARGUMENTS);
@ -122,6 +140,16 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
method_family = "LZ4HC";
else if (use_zstd)
method_family = "ZSTD";
else if (use_lizard)
method_family = "Lizard";
else if (use_lzsse2)
method_family = "LZSSE2";
else if (use_lzsse4)
method_family = "LZSSE4";
else if (use_lzsse8)
method_family = "LZSSE8";
else if (use_density)
method_family = "Density";
else if (use_none)
method_family = "NONE";
@ -137,6 +165,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
std::string codecs_line = boost::algorithm::join(codecs, ",");
auto ast = parseQuery(codec_parser, "(" + codecs_line + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
codec = CompressionCodecFactory::instance().get(ast, nullptr);
} else if (param.has_value()) {
codec = CompressionCodecFactory::instance().get(method_family, level, param);
}
else
codec = CompressionCodecFactory::instance().get(method_family, level);
@ -155,6 +185,9 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
wb = std::make_unique<WriteBufferFromFileDescriptor>(STDOUT_FILENO);
struct tms tv1, tv2;
times(&tv1);
if (stat_mode)
{
/// Output statistic for compressed file.
@ -185,6 +218,10 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
}
times(&tv2);
int tics_per_second = sysconf(_SC_CLK_TCK);
std::cerr << static_cast<double>(tv2.tms_utime - tv1.tms_utime) / tics_per_second << std::endl;
}
catch (...)
{

View File

@ -377,6 +377,30 @@ if (XZ_LIBRARY)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${XZ_INCLUDE_DIR})
endif()
set (LIZARD_LIBRARY lizard)
set (LIZARD_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/lizard/lib/)
if (LIZARD_LIBRARY)
dbms_target_link_libraries(PRIVATE ${LIZARD_LIBRARY})
#target_link_libraries (clickhouse_compression PUBLIC ${LZMA_LIBRARY})
#target_include_directories (clickhouse_compression SYSTEM BEFORE PUBLIC ${})
endif()
set (DENSITY_LIBRARY density)
set (DENSITY_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/density/src)
if (DENSITY_LIBRARY)
dbms_target_link_libraries(PRIVATE ${DENSITY_LIBRARY})
#target_link_libraries (clickhouse_compression PUBLIC ${LZMA_LIBRARY})
#target_include_directories (clickhouse_compression SYSTEM BEFORE PUBLIC ${})
endif()
set (LZSSE_LIBRARY lzsse)
set (LZSSE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/lzsse/)
if (LZSSE_LIBRARY)
dbms_target_link_libraries(PRIVATE ${LZSSE_LIBRARY})
#target_link_libraries (clickhouse_compression PUBLIC ${LZMA_LIBRARY})
#target_include_directories (clickhouse_compression SYSTEM BEFORE PUBLIC ${})
endif()
if (USE_ICU)
dbms_target_link_libraries (PRIVATE ${ICU_LIBRARIES})
dbms_target_include_directories (SYSTEM PRIVATE ${ICU_INCLUDE_DIRS})

View File

@ -424,7 +424,7 @@ void Connection::sendQuery(
if (method == "ZSTD")
level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else

View File

@ -0,0 +1,106 @@
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionCodecDensity.h>
#include <Parsers/ASTLiteral.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
CompressionCodecDensity::CompressionCodecDensity(DENSITY_ALGORITHM algo_) : algo(algo_)
{
setCodecDescription("Density", {std::make_shared<ASTLiteral>(static_cast<UInt64>(algo))});
}
uint8_t CompressionCodecDensity::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::Density);
}
void CompressionCodecDensity::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecDensity::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return density_compress_safe_size(uncompressed_size);
}
UInt32 CompressionCodecDensity::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
density_processing_result res = density_compress(reinterpret_cast<const uint8_t *>(source), source_size, reinterpret_cast<uint8_t *>(dest), density_compress_safe_size(source_size), algo);
if (res.state != DENSITY_STATE_OK)
throw Exception("Cannot compress block with Density; ", ErrorCodes::CANNOT_COMPRESS);
return res.bytesWritten;
}
void CompressionCodecDensity::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
density_processing_result res = density_decompress(reinterpret_cast<const uint8_t *>(source), source_size, reinterpret_cast<uint8_t *>(dest), density_decompress_safe_size(uncompressed_size));
if (res.state != DENSITY_STATE_OK)
throw Exception("Cannot decompress block with Density; ", ErrorCodes::CANNOT_DECOMPRESS);
}
void registerCodecDensity(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::Density);
factory.registerCompressionCodec(
"Density",
method_code,
[&](const ASTPtr & arguments) -> CompressionCodecPtr
{
DENSITY_ALGORITHM algo = CompressionCodecDensity::DENSITY_DEFAULT_ALGO;
//std::cerr << arguments << std::endl;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() != 1)
throw Exception(
"Deisnty codec must have 1 parameter, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * algo_literal = children[0]->as<ASTLiteral>();
if (!algo_literal)
throw Exception("Density codec argument must be string (algorithm)", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
if (algo_literal->value.getType() == Field::Types::Which::UInt64) {
const auto algorithm = algo_literal->value.safeGet<UInt64>();
if (algorithm == 3) {
algo = DENSITY_ALGORITHM_LION;
} else if (algorithm == 2) {
algo = DENSITY_ALGORITHM_CHEETAH;
} else if (algorithm == 1) {
algo = DENSITY_ALGORITHM_CHAMELEON;
} else {
throw Exception("Density codec argument may be LION, CHAMELEON, CHEETAH", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
} else {
const auto algorithm = algo_literal->value.safeGet<std::string>();
if (algorithm == "LION") {
algo = DENSITY_ALGORITHM_LION;
} else if (algorithm == "CHAMELEON") {
algo = DENSITY_ALGORITHM_CHAMELEON;
} else if (algorithm == "CHEETAH") {
algo = DENSITY_ALGORITHM_CHEETAH;
} else {
throw Exception("Density codec argument may be LION, CHAMELEON, CHEETAH", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
}
}
return std::make_shared<CompressionCodecDensity>(algo);
});
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Compression/ICompressionCodec.h>
#include <src/density_api.h>
namespace DB
{
class CompressionCodecDensity : public ICompressionCodec
{
public:
static constexpr auto DENSITY_DEFAULT_ALGO = DENSITY_ALGORITHM_CHAMELEON; // by default aim on speed
CompressionCodecDensity(DENSITY_ALGORITHM algo_);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const DENSITY_ALGORITHM algo;
};
}

View File

@ -0,0 +1,91 @@
#include <Compression/CompressionCodecLZSSE2.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Parsers/ASTLiteral.h>
#include <lzsse2/lzsse2.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
CompressionCodecLZSSE2::CompressionCodecLZSSE2(int level_) : level(level_)
{
setCodecDescription("LZSSE2", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
uint8_t CompressionCodecLZSSE2::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::LZSSE2);
}
void CompressionCodecLZSSE2::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZSSE2::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return uncompressed_size;
}
UInt32 CompressionCodecLZSSE2::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
UInt32 res;
LZSSE2_OptimalParseState* state = LZSSE2_MakeOptimalParseState(source_size);
res = LZSSE2_CompressOptimalParse(state, source, source_size, dest, source_size, level);
LZSSE2_FreeOptimalParseState(state);
if (res == 0)
throw Exception("Cannot compress block with LZSSE2; ", ErrorCodes::CANNOT_COMPRESS);
return res;
}
void CompressionCodecLZSSE2::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
UInt32 res;
res = LZSSE2_Decompress(source, source_size, dest, uncompressed_size);
if (res < uncompressed_size)
throw Exception("Cannot decompress block with LZSSE2; ", ErrorCodes::CANNOT_DECOMPRESS);
}
void registerCodecLZSSE2(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::LZSSE2);
factory.registerCompressionCodec(
"LZSSE2",
method_code,
[&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() != 1)
throw Exception(
"LZSSE2 codec must have 1 parameter, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * level_literal = children[0]->as<ASTLiteral>();
if (!level_literal)
throw Exception("LZSSE2 first codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = level_literal->value.safeGet<UInt64>();
}
return std::make_shared<CompressionCodecLZSSE2>(level);
});
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecLZSSE2 : public ICompressionCodec
{
public:
CompressionCodecLZSSE2(int level_);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
};
}

View File

@ -0,0 +1,91 @@
#include <Compression/CompressionCodecLZSSE4.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Parsers/ASTLiteral.h>
#include <lzsse4/lzsse4.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
CompressionCodecLZSSE4::CompressionCodecLZSSE4(int level_) : level(level_)
{
setCodecDescription("LZSSE4", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
uint8_t CompressionCodecLZSSE4::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::LZSSE4);
}
void CompressionCodecLZSSE4::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZSSE4::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return uncompressed_size;
}
UInt32 CompressionCodecLZSSE4::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
UInt32 res;
LZSSE4_OptimalParseState* state = LZSSE4_MakeOptimalParseState(source_size);
res = LZSSE4_CompressOptimalParse(state, source, source_size, dest, source_size, level);
LZSSE4_FreeOptimalParseState(state);
if (res == 0)
throw Exception("Cannot compress block with LZSSE4; ", ErrorCodes::CANNOT_COMPRESS);
return res;
}
void CompressionCodecLZSSE4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
UInt32 res;
res = LZSSE4_Decompress(source, source_size, dest, uncompressed_size);
if (res < uncompressed_size)
throw Exception("Cannot decompress block with LZSSE4; ", ErrorCodes::CANNOT_DECOMPRESS);
}
void registerCodecLZSSE4(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::LZSSE4);
factory.registerCompressionCodec(
"LZSSE4",
method_code,
[&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() != 1)
throw Exception(
"LZSSE4 codec must have 1 parameter, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * level_literal = children[0]->as<ASTLiteral>();
if (!level_literal)
throw Exception("LZSSE4 first codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = level_literal->value.safeGet<UInt64>();
}
return std::make_shared<CompressionCodecLZSSE4>(level);
});
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecLZSSE4 : public ICompressionCodec
{
public:
CompressionCodecLZSSE4(int level_);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
};
}

View File

@ -0,0 +1,90 @@
#include <Compression/CompressionCodecLZSSE8.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Parsers/ASTLiteral.h>
#include <lzsse8/lzsse8.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
CompressionCodecLZSSE8::CompressionCodecLZSSE8(int level_) : level(level_)
{
setCodecDescription("LZSSE8", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
uint8_t CompressionCodecLZSSE8::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::LZSSE8);
}
void CompressionCodecLZSSE8::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZSSE8::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return uncompressed_size;
}
UInt32 CompressionCodecLZSSE8::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
UInt32 res;
LZSSE8_OptimalParseState* state = LZSSE8_MakeOptimalParseState(source_size);
res = LZSSE8_CompressOptimalParse(state, source, source_size, dest, source_size, level);
LZSSE8_FreeOptimalParseState(state);
if (res == 0)
throw Exception("Cannot compress block with LZSSE; ", ErrorCodes::CANNOT_COMPRESS);
return res;
}
void CompressionCodecLZSSE8::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
UInt32 res;
res = LZSSE8_Decompress(source, source_size, dest, uncompressed_size);
if (res < uncompressed_size)
throw Exception("Cannot decompress block with LZSSE; ", ErrorCodes::CANNOT_DECOMPRESS);
}
void registerCodecLZSSE8(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::LZSSE8);
factory.registerCompressionCodec(
"LZSSE8",
method_code,
[&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() != 1)
throw Exception(
"LZSSE8 codec must have 1 parameter, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * level_literal = children[0]->as<ASTLiteral>();
if (!level_literal)
throw Exception("LZSSE8 first codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = level_literal->value.safeGet<UInt64>();
}
return std::make_shared<CompressionCodecLZSSE8>(level);
});
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecLZSSE8 : public ICompressionCodec
{
public:
CompressionCodecLZSSE8(int level_);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
};
}

View File

@ -0,0 +1,88 @@
#include <Compression/CompressionCodecLizard.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
#include <Parsers/ASTLiteral.h>
#include <lib/lizard_compress.h>
#include <lib/lizard_decompress.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
CompressionCodecLizard::CompressionCodecLizard(int level_) : level(level_)
{
setCodecDescription("Lizard", {std::make_shared<ASTLiteral>(static_cast<UInt64>(level))});
}
uint8_t CompressionCodecLizard::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::Lizard);
}
void CompressionCodecLizard::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLizard::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return Lizard_compressBound(uncompressed_size);
}
UInt32 CompressionCodecLizard::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
int res = Lizard_compress(source, dest, source_size, Lizard_compressBound(source_size), level);
if (res == 0)
throw Exception("Cannot compress block with Lizard; ", ErrorCodes::CANNOT_COMPRESS);
return res;
}
void CompressionCodecLizard::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
int res = Lizard_decompress_safe(source, dest, source_size, uncompressed_size);
if (res < 0)
throw Exception("Cannot compress block with Lizard; ", ErrorCodes::CANNOT_DECOMPRESS);
}
void registerCodecLizard(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::Lizard);
factory.registerCompressionCodec(
"Lizard",
method_code,
[&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = CompressionCodecLizard::LIZARD_DEFAULT_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(
"Lizard codec must have 1 parameter, given " + std::to_string(arguments->children.size()),
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception("Lizard codec argument must be integer", ErrorCodes::ILLEGAL_CODEC_PARAMETER);
level = literal->value.safeGet<UInt64>();
// compression level will be truncated to LIZARD_MAX_CLEVEL if it is greater and to LIZARD_MIN_CLEVEL if it is less
//if (level > 1)//ZSTD_maxCLevel())
// throw Exception("Lizard codec can't have level more that " + toString(1/*ZSTD_maxCLevel()*/) + ", given " + toString(level), ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
return std::make_shared<CompressionCodecLizard>(level);
});
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecLizard : public ICompressionCodec
{
public:
static constexpr auto LIZARD_DEFAULT_LEVEL = 1;
CompressionCodecLizard(int level_);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
};
}

View File

@ -44,7 +44,24 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
}
}
void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const
CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std::optional<int> level, std::optional<std::string> param) const
{
if (level && param)
{
auto level_literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
auto param_literal = std::make_shared<ASTLiteral>(static_cast<std::string>(*param));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), level_literal, param_literal)), {});
}
else if (param)
{
auto param_literal = std::make_shared<ASTLiteral>(static_cast<std::string>(*param));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), param_literal)), {});
} else {
return get(family_name, level);
}
}
void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{
if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
@ -52,16 +69,16 @@ void CompressionCodecFactory::validateCodec(const String & family_name, std::opt
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}, sanity_check);
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}, sanity_check, allow_experimental_codecs);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier), {}, sanity_check);
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier), {}, sanity_check, allow_experimental_codecs);
}
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -90,6 +107,16 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
if (sanity_check && !allow_experimental_codecs) {
if (codec_family_name == "Lizard" ||
codec_family_name == "Density" ||
codec_family_name == "LZSSE2" ||
codec_family_name == "LZSSE4" ||
codec_family_name == "LZSSE8") {
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Experimental codecs Lizard, Density and LZSSE* are not allowed, please enable allow_experimental_codecs flag.");
}
}
/// Default codec replaced with current default codec which may depend on different
/// settings (and properties of data) in runtime.
CompressionCodecPtr result_codec;
@ -172,6 +199,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
@ -318,6 +346,11 @@ void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
void registerCodecLizard(CompressionCodecFactory & factory);
void registerCodecDensity(CompressionCodecFactory & factory);
void registerCodecLZSSE2(CompressionCodecFactory & factory);
void registerCodecLZSSE4(CompressionCodecFactory & factory);
void registerCodecLZSSE8(CompressionCodecFactory & factory);
CompressionCodecFactory::CompressionCodecFactory()
{
@ -330,6 +363,11 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecDoubleDelta(*this);
registerCodecGorilla(*this);
registerCodecMultiple(*this);
registerCodecLizard(*this);
registerCodecDensity(*this);
registerCodecLZSSE2(*this);
registerCodecLZSSE4(*this);
registerCodecLZSSE8(*this);
default_codec = get("LZ4", {});
}

View File

@ -38,16 +38,16 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check);
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
}
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const;
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
/// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should
@ -72,6 +72,8 @@ public:
/// For backward compatibility with config settings
CompressionCodecPtr get(const String & family_name, std::optional<int> level) const;
CompressionCodecPtr get(const String & family_name, std::optional<int> level, std::optional<std::string> param) const;
/// Register codec with parameters and column type
void registerCompressionCodecWithType(const String & family_name, std::optional<uint8_t> byte_code, CreatorWithType creator);
/// Register codec with parameters

View File

@ -41,8 +41,14 @@ enum class CompressionMethodByte : uint8_t
Multiple = 0x91,
Delta = 0x92,
T64 = 0x93,
DoubleDelta = 0x94,
Gorilla = 0x95,
DoubleDelta = 0x94,
Gorilla = 0x95,
Lizard = 0x96,
Density = 0x97,
LZSSE2 = 0x98,
LZSSE4 = 0x99,
LZSSE8 = 0xa0,
};
}

View File

@ -238,6 +238,7 @@ class IColumn;
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs(LZSSE*, Lizard, Density).", 0) \
M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \

View File

@ -437,6 +437,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_);
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = context_->getSettingsRef().allow_experimental_codecs;
ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it)
@ -471,7 +472,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS")
throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS};
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs);
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
}
if (col_decl.ttl)

View File

@ -1398,7 +1398,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level));

View File

@ -313,7 +313,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment;
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.ttl = ttl;
@ -354,7 +354,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
if (comment)
column.comment = *comment;
@ -907,7 +907,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::BAD_ARGUMENTS};
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
all_columns.add(ColumnDescription(column_name, command.data_type));
}
@ -927,7 +927,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::NOT_IMPLEMENTED};
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
auto column_default = all_columns.getDefault(column_name);
if (column_default)
{

View File

@ -128,7 +128,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false);
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
if (col_ast->ttl)
ttl = col_ast->ttl;

View File

@ -632,7 +632,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions

View File

@ -289,7 +289,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs);
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
}
}

View File

@ -17,7 +17,8 @@ node4 = cluster.add_instance('node4', user_configs=['configs/enable_uncompressed
node5 = cluster.add_instance('node5', main_configs=['configs/zstd_compression_by_default.xml'],
user_configs=['configs/enable_uncompressed_cache.xml',
'configs/allow_suspicious_codecs.xml'])
node6 = cluster.add_instance('node6', main_configs=['configs/allow_experimental_codecs.xml'],
user_configs=['configs/allow_suspicious_codecs.xml'])
@pytest.fixture(scope="module")
def start_cluster():
@ -137,3 +138,19 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
assert node5.query(
"SELECT max(length(data)) from compression_codec_multiple_with_key GROUP BY data ORDER BY max(length(data)) DESC LIMIT 1") == "10000\n"
def test_experimental_codecs(start_cluster):
node6.query("""
CREATE TABLE compression_experimental_codecs (
somedate Date CODEC(Lizard(12)),
id UInt64 CODEC(Density('LION')),
data String CODEC(LZSSE4(3))
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
""")
node6.query(
"INSERT INTO compression_experimental_codecs VALUES(toDate('2018-10-12'), 100000, '{}')".format(
'c' * 10000))
assert node6.query(
"SELECT max(length(data)) from compression_experimental_codecs GROUP BY data ORDER BY max(length(data)) DESC LIMIT 1") == "10000\n"