Merge pull request #47271 from Avogar/codecs-better

Add optional parameters to some codecs, fix aborts in clickhouse-compressor with some codecs.
This commit is contained in:
Kruglov Pavel 2023-03-16 12:20:23 +01:00 committed by GitHub
commit a3510a2ffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 145 additions and 33 deletions

View File

@ -393,15 +393,15 @@ These codecs are designed to make compression more effective by using specific f
#### DoubleDelta
`DoubleDelta` — Calculates delta of deltas and writes it in compact binary form. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-byte deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
#### Gorilla
`Gorilla` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
`Gorilla(bytes_size)` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, its 1. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
#### FPC
`FPC` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
`FPC(level, float_size)` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. Possible `level` values: 1-28, the default value is 12. Possible `float_size` values: 4, 8, the default value is `sizeof(type)` if type is Float. In all other cases, its 4. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
#### T64

View File

@ -66,6 +66,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
using namespace DB;
namespace po = boost::program_options;
bool print_stacktrace = false;
try
{
po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
@ -84,6 +85,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("level", po::value<int>(), "compression level for codecs specified via flags")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
("stacktrace", "print stacktrace of exception")
;
po::positional_options_description positional_desc;
@ -107,6 +109,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool use_deflate_qpl = options.count("deflate_qpl");
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
print_stacktrace = options.count("stacktrace");
unsigned block_size = options["block-size"].as<unsigned>();
std::vector<std::string> codecs;
if (options.count("codec"))
@ -188,11 +191,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
/// Compression
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
to.finalize();
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
std::cerr << getCurrentExceptionMessage(print_stacktrace) << '\n';
return getCurrentExceptionCode();
}

View File

@ -193,7 +193,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Delta);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = 0;
/// Default bytes size is 1.
UInt8 delta_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
@ -202,8 +203,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)

View File

@ -7,7 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -31,7 +31,7 @@ namespace DB
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* to support 64bit types. The drawback is 1 extra bit for 32-bit wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
@ -145,6 +145,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -549,10 +551,28 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory)
factory.registerCompressionCodecWithType("DoubleDelta", method_code,
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec DoubleDelta does not accept any arguments");
/// Default bytes size is 1.
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "DoubleDelta codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "DoubleDelta codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for DoubleDelta codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecDoubleDelta>(data_bytes_size);
});
}

View File

@ -109,28 +109,42 @@ void registerCodecFPC(CompressionCodecFactory & factory)
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
UInt8 float_width = 0;
/// Set default float width to 4.
UInt8 float_width = 4;
if (column_type != nullptr)
float_width = getFloatBytesSize(*column_type);
UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
if (arguments->children.size() > 2)
{
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE,
"FPC codec must have 1 parameter, given {}", arguments->children.size());
"FPC codec must have from 0 to 2 parameters, given {}", arguments->children.size());
}
const auto * literal = arguments->children.front()->as<ASTLiteral>();
if (!literal)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be integer");
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
level = literal->value.safeGet<UInt8>();
if (level < 1 || level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec level must be between {} and {}",
1, static_cast<int>(CompressionCodecFPC::MAX_COMPRESSION_LEVEL));
if (arguments->children.size() == 2)
{
literal = arguments->children[1]->as<ASTLiteral>();
if (!literal || !isInt64OrUInt64FieldType(literal->value.getType()))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
size_t user_float_width = literal->value.safeGet<UInt64>();
if (user_float_width != 4 && user_float_width != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Float size for FPC codec can be 4 or 8, given {}", user_float_width);
float_width = static_cast<UInt8>(user_float_width);
}
}
return std::make_shared<CompressionCodecFPC>(float_width, level);
};
factory.registerCompressionCodecWithType("FPC", method_code, codec_builder);

View File

@ -7,6 +7,7 @@
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTLiteral.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/BitHelpers.h>
@ -134,6 +135,8 @@ namespace ErrorCodes
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
namespace
@ -445,10 +448,28 @@ void registerCodecGorilla(CompressionCodecFactory & factory)
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Gorilla);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Gorilla does not accept any arguments");
/// Default bytes size is 1
UInt8 data_bytes_size = 1;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "Gorilla codec must have 1 parameter, given {}", arguments->children.size());
const auto children = arguments->children;
const auto * literal = children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Gorilla codec argument must be unsigned integer");
size_t user_bytes_size = literal->value.safeGet<UInt64>();
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for Gorilla codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
data_bytes_size = static_cast<UInt8>(user_bytes_size);
}
else if (column_type)
{
data_bytes_size = getDataBytesSize(column_type);
}
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared<CompressionCodecGorilla>(data_bytes_size);
};
factory.registerCompressionCodecWithType("Gorilla", method_code, codec_builder);

View File

@ -33,7 +33,8 @@ public:
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
// type_idx_ is required for compression, but not for decompression.
CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
@ -53,7 +54,7 @@ protected:
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
std::optional<TypeIndex> type_idx;
Variant variant;
};
@ -91,9 +92,12 @@ enum class MagicNumber : uint8_t
IPv4 = 21,
};
MagicNumber serializeTypeId(TypeIndex type_id)
MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
{
switch (type_id)
if (!type_id)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "T64 codec doesn't support compression without information about column type");
switch (*type_id)
{
case TypeIndex::UInt8: return MagicNumber::UInt8;
case TypeIndex::UInt16: return MagicNumber::UInt16;
@ -115,7 +119,7 @@ MagicNumber serializeTypeId(TypeIndex type_id)
break;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(type_id));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(*type_id));
}
TypeIndex deserializeTypeId(uint8_t serialized_type_id)
@ -632,7 +636,7 @@ UInt32 CompressionCodecT64::doCompressData(const char * src, UInt32 src_size, ch
memcpy(dst, &cookie, 1);
dst += 1;
switch (baseType(type_idx))
switch (baseType(*type_idx))
{
case TypeIndex::Int8:
return 1 + compressData<Int8>(src, src_size, dst, variant);
@ -699,7 +703,7 @@ uint8_t CompressionCodecT64::getMethodByte() const
return codecId();
}
CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
CompressionCodecT64::CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_)
: type_idx(type_idx_)
, variant(variant_)
{
@ -712,7 +716,7 @@ CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(type_idx.value_or(TypeIndex::Nothing));
hash.update(variant);
}
@ -742,9 +746,14 @@ void registerCodecT64(CompressionCodecFactory & factory)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Wrong modification for T64: {}", name);
}
auto type_idx = typeIdx(type);
if (type && type_idx == TypeIndex::Nothing)
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
std::optional<TypeIndex> type_idx;
if (type)
{
type_idx = typeIdx(type);
if (type_idx == TypeIndex::Nothing)
throw Exception(
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
}
return std::make_shared<CompressionCodecT64>(type_idx, variant);
};

View File

@ -2,5 +2,5 @@ CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Delta, ZSTD(22)))
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(DoubleDelta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Gorilla, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(DoubleDelta(3), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(Gorilla('hello, world'), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(DoubleDelta(3), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError ILLEGAL_CODEC_PARAMETER }
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(Gorilla('hello, world'), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError ILLEGAL_CODEC_PARAMETER }

View File

@ -0,0 +1,9 @@
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,34 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "Hello, World!" > 02584_test_data
$CLICKHOUSE_COMPRESSOR --codec 'Delta' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
$CLICKHOUSE_COMPRESSOR --codec 'Delta(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'Delta([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'Delta(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'FPC' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5, 1)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'FPC([1,2,3])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5, 4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
$CLICKHOUSE_COMPRESSOR --codec 'T64' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "CANNOT_COMPRESS";
rm 02584_test_data 02584_test_out