dbms: added ZSTD algorithm (experimental) [#METR-15386].

This commit is contained in:
Alexey Milovidov 2015-03-09 04:15:43 +03:00
parent ae4c458c5b
commit 7d11fc0931
8 changed files with 118 additions and 58 deletions

View File

@ -277,6 +277,8 @@ namespace ErrorCodes
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
CPUID_ERROR, CPUID_ERROR,
INFINITE_LOOP, INFINITE_LOOP,
CANNOT_COMPRESS,
CANNOT_DECOMPRESS,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -53,12 +53,13 @@ private:
owned_cell.reset(new UncompressedCacheCell); owned_cell.reset(new UncompressedCacheCell);
size_t size_decompressed; size_t size_decompressed;
owned_cell->compressed_size = readCompressedData(size_decompressed); size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (owned_cell->compressed_size) if (owned_cell->compressed_size)
{ {
owned_cell->data.resize(size_decompressed); owned_cell->data.resize(size_decompressed);
decompress(owned_cell->data.m_data, size_decompressed); decompress(owned_cell->data.m_data, size_decompressed, size_compressed_without_checksum);
/// Положим данные в кэш. /// Положим данные в кэш.
cache->set(key, owned_cell); cache->set(key, owned_cell);

View File

@ -14,14 +14,15 @@ private:
bool nextImpl() bool nextImpl()
{ {
size_t size_decompressed; size_t size_decompressed;
size_compressed = readCompressedData(size_decompressed); size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed) if (!size_compressed)
return false; return false;
memory.resize(size_decompressed); memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]); working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed); decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true; return true;
} }
@ -44,14 +45,15 @@ public:
while (bytes_read < n) while (bytes_read < n)
{ {
size_t size_decompressed; size_t size_decompressed;
size_t size_compressed_without_checksum;
if (!readCompressedData(size_decompressed)) if (!readCompressedData(size_decompressed, size_compressed_without_checksum))
return bytes_read; return bytes_read;
/// Если разжатый блок помещается целиком туда, куда его надо скопировать. /// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read) if (size_decompressed <= n - bytes_read)
{ {
decompress(to + bytes_read, size_decompressed); decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed; bytes_read += size_decompressed;
bytes += size_decompressed; bytes += size_decompressed;
} }
@ -62,7 +64,7 @@ public:
working_buffer = Buffer(&memory[0], &memory[size_decompressed]); working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin(); pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed); decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read); bytes_read += read(to + bytes_read, n - bytes_read);
break; break;

View File

@ -5,6 +5,7 @@
#include <city.h> #include <city.h>
#include <quicklz/quicklz_level1.h> #include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h> #include <lz4/lz4.h>
#include <zstd/zstd.h>
#include <DB/Common/PODArray.h> #include <DB/Common/PODArray.h>
#include <DB/Common/ProfileEvents.h> #include <DB/Common/ProfileEvents.h>
@ -32,7 +33,7 @@ protected:
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму. /// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
/// Возвращает количество прочитанных байт. /// Возвращает количество прочитанных байт.
size_t readCompressedData(size_t & size_decompressed) size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{ {
if (compressed_in->eof()) if (compressed_in->eof())
return 0; return 0;
@ -44,14 +45,15 @@ protected:
compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE); compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
size_t size_compressed;
size_t & size_compressed = size_compressed_without_checksum;
if (method < 0x80) if (method < 0x80)
{ {
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]); size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]); size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
} }
else if (method == 0x82) else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{ {
size_compressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[1]); size_compressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[1]);
size_decompressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[5]); size_decompressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[5]);
@ -85,7 +87,7 @@ protected:
return size_compressed + sizeof(checksum); return size_compressed + sizeof(checksum);
} }
void decompress(char * to, size_t size_decompressed) void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{ {
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks); ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed); ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
@ -99,10 +101,19 @@ protected:
qlz_decompress(&compressed_buffer[0], to, qlz_state); qlz_decompress(&compressed_buffer[0], to, qlz_state);
} }
else if (method == 0x82) else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
{ {
if (LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed) < 0) if (LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed) < 0)
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CORRUPTED_DATA); throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
}
else if (method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{
size_t res = ZSTD_decompress(
to, size_decompressed,
&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed_without_checksum - QUICKLZ_HEADER_SIZE);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
} }
else else
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD); throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);

View File

@ -24,14 +24,15 @@ private:
bool nextImpl() bool nextImpl()
{ {
size_t size_decompressed; size_t size_decompressed;
size_compressed = readCompressedData(size_decompressed); size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed) if (!size_compressed)
return false; return false;
memory.resize(size_decompressed); memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]); working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed); decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true; return true;
} }
@ -81,8 +82,9 @@ public:
while (bytes_read < n) while (bytes_read < n)
{ {
size_t size_decompressed = 0; size_t size_decompressed = 0;
size_t size_compressed_without_checksum = 0;
size_t new_size_compressed = readCompressedData(size_decompressed); size_t new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
size_compressed = 0; /// file_in больше не указывает на конец блока в working_buffer. size_compressed = 0; /// file_in больше не указывает на конец блока в working_buffer.
if (!new_size_compressed) if (!new_size_compressed)
return bytes_read; return bytes_read;
@ -90,7 +92,7 @@ public:
/// Если разжатый блок помещается целиком туда, куда его надо скопировать. /// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read) if (size_decompressed <= n - bytes_read)
{ {
decompress(to + bytes_read, size_decompressed); decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed; bytes_read += size_decompressed;
bytes += size_decompressed; bytes += size_decompressed;
} }
@ -102,7 +104,7 @@ public:
working_buffer = Buffer(&memory[0], &memory[size_decompressed]); working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin(); pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed); decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read); bytes_read += read(to + bytes_read, n - bytes_read);
break; break;

View File

@ -11,15 +11,48 @@
namespace DB namespace DB
{ {
namespace CompressionMethod
{
/** Метод сжатия */ /** Метод сжатия */
enum Enum enum class CompressionMethod
{ {
QuickLZ, QuickLZ,
LZ4, LZ4,
LZ4HC, /// Формат такой же, как у LZ4. Разница только при сжатии. LZ4HC, /// Формат такой же, как у LZ4. Разница только при сжатии.
ZSTD, /// Экспериментальный алгоритм: https://github.com/Cyan4973/zstd
};
/** Формат сжатого блока следующий:
*
* Первые 16 байт - чексумма от всех остальных байт блока. Сейчас используется только CityHash128.
* В дальнейшем можно предусмотреть другие чексуммы, хотя сделать их другого размера не получится.
*
* Следующий байт определяет алгоритм сжатия. Далее всё зависит от алгоритма.
*
* Первые 4 варианта совместимы с QuickLZ level 1.
* То есть, если значение первого байта < 4, для разжатия достаточно использовать функцию qlz_level1_decompress.
*
* 0x00 - несжатые данные, маленький блок. Далее один байт - размер сжатых данных, с учётом заголовка; один байт - размер несжатых данных.
* 0x01 - сжатые данные, QuickLZ level 1, маленький блок. Далее два байта аналогично.
* 0x02 - несжатые данные, большой блок. Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
* 0x03 - сжатые данные, QuickLZ level 1, большой блок. Далее 8 байт аналогично.
*
* 0x82 - LZ4 или LZ4HC (они имеют одинаковый формат).
* Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
*
* NOTE: Почему 0x82?
* Изначально использовался только QuickLZ. Потом был добавлен LZ4.
* Старший бит выставлен, чтобы отличить от QuickLZ, а второй бит выставлен для совместимости,
* чтобы работали функции qlz_size_compressed, qlz_size_decompressed.
* Хотя сейчас такая совместимость уже не актуальна.
*
* 0x90 - ZSTD
*
* Все размеры - little endian.
*/
enum class CompressionMethodByte : uint8_t
{
LZ4 = 0x82,
ZSTD = 0x90,
}; };
}
} }

View File

@ -8,6 +8,7 @@
#include <quicklz/quicklz_level1.h> #include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h> #include <lz4/lz4.h>
#include <lz4/lz4hc.h> #include <lz4/lz4hc.h>
#include <zstd/zstd.h>
#include <DB/Common/PODArray.h> #include <DB/Common/PODArray.h>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
@ -24,7 +25,7 @@ class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{ {
private: private:
WriteBuffer & out; WriteBuffer & out;
CompressionMethod::Enum method; CompressionMethod method;
PODArray<char> compressed_buffer; PODArray<char> compressed_buffer;
qlz_state_compress * qlz_state; qlz_state_compress * qlz_state;
@ -38,31 +39,7 @@ private:
size_t compressed_size = 0; size_t compressed_size = 0;
char * compressed_buffer_ptr = nullptr; char * compressed_buffer_ptr = nullptr;
/** Формат сжатого блока следующий: /** Формат сжатого блока - см. CompressedStream.h
*
* Первые 16 байт - чексумма от всех остальных байт блока. Сейчас используется только CityHash128.
* В дальнейшем можно предусмотреть другие чексуммы, хотя сделать их другого размера не получится.
*
* Следующий байт определяет алгоритм сжатия. Далее всё зависит от алгоритма.
*
* Первые 4 варианта совместимы с QuickLZ level 1.
* То есть, если значение первого байта < 4, для разжатия достаточно использовать функцию qlz_level1_decompress.
*
* 0x00 - несжатые данные, маленький блок. Далее один байт - размер сжатых данных, с учётом заголовка; один байт - размер несжатых данных.
* 0x01 - сжатые данные, QuickLZ level 1, маленький блок. Далее два байта аналогично.
* 0x02 - несжатые данные, большой блок. Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
* 0x03 - сжатые данные, QuickLZ level 1, большой блок. Далее 8 байт аналогично.
*
* 0x82 - LZ4 или LZ4HC (они имеют одинаковый формат).
* Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
*
* NOTE: Почему 0x82?
* Изначально использовался только QuickLZ. Потом был добавлен LZ4.
* Старший бит выставлен, чтобы отличить от QuickLZ, а второй бит выставлен для совместимости,
* чтобы работали функции qlz_size_compressed, qlz_size_decompressed.
* Хотя сейчас такая совместимость уже не актуальна.
*
* Все размеры - little endian.
*/ */
switch (method) switch (method)
@ -88,7 +65,7 @@ private:
compressed_buffer.resize(header_size + LZ4_COMPRESSBOUND(uncompressed_size)); compressed_buffer.resize(header_size + LZ4_COMPRESSBOUND(uncompressed_size));
compressed_buffer[0] = 0x82; /// Второй бит - для совместимости с QuickLZ - обозначает, что размеры записываются 4 байтами. compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::LZ4);
if (method == CompressionMethod::LZ4) if (method == CompressionMethod::LZ4)
compressed_size = header_size + LZ4_compress( compressed_size = header_size + LZ4_compress(
@ -110,6 +87,34 @@ private:
compressed_buffer_ptr = &compressed_buffer[0]; compressed_buffer_ptr = &compressed_buffer[0];
break; break;
} }
case CompressionMethod::ZSTD:
{
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
compressed_buffer.resize(header_size + ZSTD_compressBound(uncompressed_size));
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::ZSTD);
size_t res = ZSTD_compress(
&compressed_buffer[header_size],
compressed_buffer.size(),
working_buffer.begin(),
uncompressed_size);
if (ZSTD_isError(res))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS);
compressed_size = header_size + res;
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
memcpy(&compressed_buffer[1], reinterpret_cast<const char *>(&compressed_size_32), sizeof(compressed_size_32));
memcpy(&compressed_buffer[5], reinterpret_cast<const char *>(&uncompressed_size_32), sizeof(uncompressed_size_32));
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
default: default:
throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
} }
@ -123,7 +128,7 @@ private:
public: public:
CompressedWriteBuffer( CompressedWriteBuffer(
WriteBuffer & out_, WriteBuffer & out_,
CompressionMethod::Enum method_ = CompressionMethod::LZ4, CompressionMethod method_ = CompressionMethod::LZ4,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_), qlz_state(new qlz_state_compress) {} : BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_), qlz_state(new qlz_state_compress) {}

View File

@ -45,6 +45,7 @@ int main(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size") ("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4") ("hc", "use LZ4HC instead of LZ4")
("qlz", "use QuickLZ (level 1) instead of LZ4") ("qlz", "use QuickLZ (level 1) instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("stat", "print block statistics of compressed data") ("stat", "print block statistics of compressed data")
; ;
@ -61,17 +62,20 @@ int main(int argc, char ** argv)
try try
{ {
bool decompress = options.count("d"); bool decompress = options.count("d");
bool use_qlz = options.count("qlz");; bool use_qlz = options.count("qlz");
bool use_lz4hc = options.count("hc");; bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat"); bool stat_mode = options.count("stat");
unsigned block_size = options["block-size"].as<unsigned>(); unsigned block_size = options["block-size"].as<unsigned>();
DB::CompressionMethod::Enum method = DB::CompressionMethod::LZ4; DB::CompressionMethod method = DB::CompressionMethod::LZ4;
if (use_qlz) if (use_qlz)
method = DB::CompressionMethod::QuickLZ; method = DB::CompressionMethod::QuickLZ;
else if (use_lz4hc) else if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC; method = DB::CompressionMethod::LZ4HC;
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO); DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO); DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);