Merge pull request #25002 from ClickHouse/compression-codecs-refactoring

Compression codecs refactoring
This commit is contained in:
alexey-milovidov 2021-06-07 02:54:24 +03:00 committed by GitHub
commit cc5755b1a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 456 additions and 461 deletions

1
.gitmodules vendored
View File

@ -228,7 +228,6 @@
[submodule "contrib/datasketches-cpp"] [submodule "contrib/datasketches-cpp"]
path = contrib/datasketches-cpp path = contrib/datasketches-cpp
url = https://github.com/ClickHouse-Extras/datasketches-cpp.git url = https://github.com/ClickHouse-Extras/datasketches-cpp.git
[submodule "contrib/yaml-cpp"] [submodule "contrib/yaml-cpp"]
path = contrib/yaml-cpp path = contrib/yaml-cpp
url = https://github.com/ClickHouse-Extras/yaml-cpp.git url = https://github.com/ClickHouse-Extras/yaml-cpp.git

View File

@ -528,7 +528,6 @@ include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake) include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake) include (cmake/find/yaml-cpp.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY) if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "") set (ENABLE_ORC OFF CACHE INTERNAL "")
endif() endif()

View File

@ -4,6 +4,6 @@ if (NOT USE_YAML_CPP)
return() return()
endif() endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp") if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp/README.md")
message (ERROR "submodule contrib/yaml-cpp is missing. to fix try run: \n git submodule update --init --recursive") message (ERROR "submodule contrib/yaml-cpp is missing. to fix try run: \n git submodule update --init --recursive")
endif() endif()

View File

@ -61,7 +61,6 @@ endif()
add_subdirectory (poco-cmake) add_subdirectory (poco-cmake)
add_subdirectory (croaring-cmake) add_subdirectory (croaring-cmake)
# TODO: refactor the contrib libraries below this comment. # TODO: refactor the contrib libraries below this comment.
if (USE_INTERNAL_ZSTD_LIBRARY) if (USE_INTERNAL_ZSTD_LIBRARY)

View File

@ -424,7 +424,7 @@ void Connection::sendQuery(
if (method == "ZSTD") if (method == "ZSTD")
level = settings->network_zstd_compression_level; level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
compression_codec = CompressionCodecFactory::instance().get(method, level); compression_codec = CompressionCodecFactory::instance().get(method, level);
} }
else else

View File

@ -14,6 +14,6 @@
#cmakedefine01 USE_SENTRY #cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC #cmakedefine01 USE_GRPC
#cmakedefine01 USE_STATS #cmakedefine01 USE_STATS
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_DATASKETCHES #cmakedefine01 USE_DATASKETCHES
#cmakedefine01 USE_YAML_CPP #cmakedefine01 USE_YAML_CPP
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDelta.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -11,6 +11,29 @@
namespace DB namespace DB
{ {
class CompressionCodecDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,32 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecDelta : public ICompressionCodec
{
public:
CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDoubleDelta.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -15,9 +15,126 @@
#include <type_traits> #include <type_traits>
#include <limits> #include <limits>
namespace DB namespace DB
{ {
/** NOTE DoubleDelta is surprisingly bad name. The only excuse is that it comes from an academic paper.
* Most people will think that "double delta" is just applying delta transform twice.
* But in fact it is something more than applying delta transform twice.
*/
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,118 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecGorilla.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -14,9 +14,118 @@
#include <bitset> #include <bitset>
namespace DB namespace DB
{ {
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
explicit CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,115 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,7 +1,7 @@
#include "CompressionCodecLZ4.h"
#include <lz4.h> #include <lz4.h>
#include <lz4hc.h> #include <lz4hc.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Compression/LZ4_decompress_faster.h> #include <Compression/LZ4_decompress_faster.h>
@ -9,7 +9,9 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/BufferWithOwnMemory.h>
#pragma GCC diagnostic ignored "-Wold-style-cast" #pragma GCC diagnostic ignored "-Wold-style-cast"
@ -17,11 +19,51 @@
namespace DB namespace DB
{ {
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
explicit CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
explicit CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER; extern const int ILLEGAL_CODEC_PARAMETER;
} }
CompressionCodecLZ4::CompressionCodecLZ4() CompressionCodecLZ4::CompressionCodecLZ4()

View File

@ -1,52 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
}

View File

@ -1,6 +1,6 @@
#include <cstring> #include <cstring>
#include <Compression/CompressionCodecT64.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
@ -8,18 +8,63 @@
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Core/Types.h>
namespace DB namespace DB
{ {
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS; extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER; extern const int ILLEGAL_CODEC_PARAMETER;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
namespace namespace

View File

@ -1,53 +0,0 @@
#pragma once
#include <Core/Types.h>
#include <Compression/ICompressionCodec.h>
namespace DB
{
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecZSTD.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <zstd.h> #include <zstd.h>
@ -7,11 +7,44 @@
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB namespace DB
{ {
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
explicit CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,42 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
}

View File

@ -1,3 +1,7 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
@ -13,6 +17,7 @@
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
@ -34,8 +39,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
{ {
if (level) if (level)
{ {
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level)); auto level_literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}); return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), level_literal)), {});
} }
else else
{ {
@ -44,7 +49,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
} }
} }
void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{ {
if (family_name.empty()) if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS); throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
@ -52,16 +58,19 @@ void CompressionCodecFactory::validateCodec(const String & family_name, std::opt
if (level) if (level)
{ {
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level)); auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}, sanity_check); validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
} }
else else
{ {
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name)); auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier), {}, sanity_check); validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
} }
} }
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{ {
if (const auto * func = ast->as<ASTFunction>()) if (const auto * func = ast->as<ASTFunction>())
{ {
@ -72,7 +81,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
std::optional<size_t> generic_compression_codec_pos; std::optional<size_t> generic_compression_codec_pos;
bool can_substitute_codec_arguments = true; bool can_substitute_codec_arguments = true;
for (size_t i = 0; i < func->arguments->children.size(); ++i) for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{ {
const auto & inner_codec_ast = func->arguments->children[i]; const auto & inner_codec_ast = func->arguments->children[i];
String codec_family_name; String codec_family_name;
@ -107,7 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
if (column_type) if (column_type)
{ {
CompressionCodecPtr prev_codec; CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type) IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
{ {
if (ISerialization::isSpecialCompressionAllowed(substream_path)) if (ISerialization::isSpecialCompressionAllowed(substream_path))
{ {
@ -132,6 +142,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
result_codec = getImpl(codec_family_name, codec_arguments, nullptr); result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
} }
if (!allow_experimental_codecs && result_codec->isExperimental())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is experimental and not meant to be used in production."
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc()); codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
} }
@ -172,6 +188,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS); " (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
} }
/// For columns with nested types like Tuple(UInt32, UInt64) we /// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on /// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and /// data type, because for the first column Delta(4) is suitable and
@ -195,7 +212,9 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC); throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
} }
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
CompressionCodecPtr CompressionCodecFactory::get(
const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
{ {
if (current_default == nullptr) if (current_default == nullptr)
current_default = default_codec; current_default = default_codec;
@ -246,6 +265,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE); throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);
} }
CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const
{ {
const auto family_code_and_creator = family_code_with_codec.find(byte_code); const auto family_code_and_creator = family_code_with_codec.find(byte_code);
@ -303,7 +323,7 @@ void CompressionCodecFactory::registerSimpleCompressionCodec(
registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast) registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast)
{ {
if (ast) if (ast)
throw Exception("Compression codec " + family_name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS); throw Exception(ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS, "Compression codec {} cannot have arguments", family_name);
return creator(); return creator();
}); });
} }

View File

@ -38,16 +38,16 @@ public:
CompressionCodecPtr getDefaultCodec() const; CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters) /// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const; ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method. /// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{ {
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check); return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
} }
/// Validate codecs AST specified by user /// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const; void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
/// Get codec by AST and possible column_type. Some codecs can use /// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should /// information about type to improve inner settings, but every codec should

View File

@ -41,8 +41,8 @@ enum class CompressionMethodByte : uint8_t
Multiple = 0x91, Multiple = 0x91,
Delta = 0x92, Delta = 0x92,
T64 = 0x93, T64 = 0x93,
DoubleDelta = 0x94, DoubleDelta = 0x94,
Gorilla = 0x95, Gorilla = 0x95,
}; };
} }

View File

@ -73,6 +73,10 @@ public:
/// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time. /// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time.
virtual bool isGenericCompression() const = 0; virtual bool isGenericCompression() const = 0;
/// It is a codec available only for evaluation purposes and not meant to be used in production.
/// It will not be allowed to use unless the user will turn off the safety switch.
virtual bool isExperimental() const { return false; }
/// If it does nothing. /// If it does nothing.
virtual bool isNone() const { return false; } virtual bool isNone() const { return false; }

View File

@ -240,6 +240,7 @@ class IColumn;
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \ M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \ M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \ M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \ M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \ M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \ M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \

View File

@ -447,6 +447,8 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_); defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_);
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs; bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
ColumnsDescription res; ColumnsDescription res;
auto name_type_it = column_names_and_types.begin(); auto name_type_it = column_names_and_types.begin();
for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it) for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it)
@ -481,7 +483,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS") if (col_decl.default_specifier == "ALIAS")
throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS}; throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS};
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST( column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs); col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
} }
if (col_decl.ttl) if (col_decl.ttl)

View File

@ -1398,7 +1398,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable) if (state.compression == Protocol::Compression::Enable)
{ {
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>( state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level)); *out, CompressionCodecFactory::instance().get(method, level));

View File

@ -348,7 +348,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment; column.comment = *comment;
if (codec) if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false); column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.ttl = ttl; column.ttl = ttl;
@ -389,7 +389,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else else
{ {
if (codec) if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false); column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
if (comment) if (comment)
column.comment = *comment; column.comment = *comment;
@ -995,7 +995,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::BAD_ARGUMENTS}; ErrorCodes::BAD_ARGUMENTS};
if (command.codec) if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
all_columns.add(ColumnDescription(column_name, command.data_type)); all_columns.add(ColumnDescription(column_name, command.data_type));
} }
@ -1015,7 +1015,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::NOT_IMPLEMENTED}; ErrorCodes::NOT_IMPLEMENTED};
if (command.codec) if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
auto column_default = all_columns.getDefault(column_name); auto column_default = all_columns.getDefault(column_name);
if (column_default) if (column_default)
{ {

View File

@ -128,7 +128,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>(); comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec) if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false); codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
if (col_ast->ttl) if (col_ast->ttl)
ttl = col_ast->ttl; ttl = col_ast->ttl;

View File

@ -632,7 +632,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
if (compression_method == "ZSTD") if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level; compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level); CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions /// tmp directory is used to ensure atomicity of transactions

View File

@ -189,7 +189,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
if (action_type == ActionType::ADD_PART) if (action_type == ActionType::ADD_PART)
{ {
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); MergedBlockOutputStream part_out(
part,
metadata_snapshot,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}));
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context); part->partition.create(metadata_snapshot, block, 0, context);

View File

@ -3,8 +3,9 @@
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <Storages/System/StorageSystemErrors.h> #include <Storages/System/StorageSystemErrors.h>
#include <Common/ErrorCodes.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/ErrorCodes.h>
namespace DB namespace DB
{ {

View File

@ -289,7 +289,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{ {
result.recompression_codec = result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST( CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs); ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
} }
} }

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_codecs>1</allow_experimental_codecs>
</default>
</profiles>
</yandex>

View File

@ -17,7 +17,8 @@ node4 = cluster.add_instance('node4', user_configs=['configs/enable_uncompressed
node5 = cluster.add_instance('node5', main_configs=['configs/zstd_compression_by_default.xml'], node5 = cluster.add_instance('node5', main_configs=['configs/zstd_compression_by_default.xml'],
user_configs=['configs/enable_uncompressed_cache.xml', user_configs=['configs/enable_uncompressed_cache.xml',
'configs/allow_suspicious_codecs.xml']) 'configs/allow_suspicious_codecs.xml'])
node6 = cluster.add_instance('node6', main_configs=['configs/allow_experimental_codecs.xml'],
user_configs=['configs/allow_suspicious_codecs.xml'])
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def start_cluster(): def start_cluster():