dbms: easier to add more compression algrorithms [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-10-06 01:59:23 +04:00
parent ec9fb6f600
commit a3dbff3fc4
3 changed files with 63 additions and 23 deletions

View File

@ -13,6 +13,7 @@
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/CompressedStream.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
@ -42,14 +43,27 @@ protected:
own_compressed_buffer.resize(QUICKLZ_HEADER_SIZE);
compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_t size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
size_t size_compressed;
if (method < 4)
{
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
}
else if (method == 0x82)
{
size_compressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[1]);
size_decompressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[5]);
}
else
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
/// Находится ли сжатый блок целиком в буфере compressed_in?
if (compressed_in->offset() >= QUICKLZ_HEADER_SIZE &&
compressed_in->position() + size_compressed - QUICKLZ_HEADER_SIZE <= compressed_in->buffer().end())
@ -76,16 +90,22 @@ protected:
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
/// Старший бит первого байта определяет использованный метод сжатия.
if ((compressed_buffer[0] & 0x80) == 0)
UInt8 method = compressed_buffer[0]; /// См. CompressedWriteBuffer.h
if (method < 4)
{
if (!qlz_state)
qlz_state = new qlz_state_decompress;
qlz_decompress(&compressed_buffer[0], to, qlz_state);
}
else if (method == 0x82)
{
if (LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed) < 0)
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CORRUPTED_DATA);
}
else
LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed);
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
public:

View File

@ -13,12 +13,12 @@ namespace DB
namespace CompressionMethod
{
/** Метод сжатия */
/** Метод сжатия */
enum Enum
{
QuickLZ = 0,
LZ4 = 1,
LZ4HC = 2, /// Формат такой же, как у LZ4. Разница только при сжатии.
QuickLZ,
LZ4,
LZ4HC, /// Формат такой же, как у LZ4. Разница только при сжатии.
};
}

View File

@ -38,9 +38,31 @@ private:
size_t compressed_size = 0;
char * compressed_buffer_ptr = nullptr;
/** Для того, чтобы различить между QuickLZ и LZ4 и сохранить обратную совместимость (со случаем, когда использовался только QuickLZ),
* используем старший бит первого байта в сжатых данных (который сейчас не используется в QuickLZ).
* PS. Если потребуется использовать другие библиотеки, то можно использовать ещё один бит первого байта, или старший бит размера.
/** Формат сжатого блока следующий:
*
* Первые 16 байт - чексумма от всех остальных байт блока. Сейчас используется только CityHash128.
* В дальнейшем можно предусмотреть другие чексуммы, хотя сделать их другого размера не получится.
*
* Следующий байт определяет алгоритм сжатия. Далее всё зависит от алгоритма.
*
* Первые 4 варианта совместимы с QuickLZ level 1.
* То есть, если значение первого байта < 4, для разжатия достаточно использовать функцию qlz_level1_decompress.
*
* 0x00 - несжатые данные, маленький блок. Далее один байт - размер сжатых данных, с учётом заголовка; один байт - размер несжатых данных.
* 0x01 - сжатые данные, QuickLZ level 1, маленький блок. Далее два байта аналогично.
* 0x02 - несжатые данные, большой блок. Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
* 0x03 - сжатые данные, QuickLZ level 1, большой блок. Далее 8 байт аналогично.
*
* 0x82 - LZ4 или LZ4HC (они имеют одинаковый формат).
* Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
*
* NOTE: Почему 0x82?
* Изначально использовался только QuickLZ. Потом был добавлен LZ4.
* Старший бит выставлен, чтобы отличить от QuickLZ, а второй бит выставлен для совместимости,
* чтобы работали функции qlz_size_compressed, qlz_size_decompressed.
* Хотя сейчас такая совместимость уже не актуальна.
*
* Все размеры - little endian.
*/
switch (method)
@ -55,30 +77,28 @@ private:
uncompressed_size,
qlz_state);
compressed_buffer[0] &= 3;
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
case CompressionMethod::LZ4:
case CompressionMethod::LZ4HC:
{
/** В случае LZ4, в начале запишем заголовок такого же размера и структуры, как в QuickLZ
* 1 байт, чтобы отличить LZ4 от QuickLZ.
* 4 байта - размер сжатых данных
* 4 байта - размер несжатых данных.
*/
compressed_buffer.resize(QUICKLZ_HEADER_SIZE + LZ4_COMPRESSBOUND(uncompressed_size));
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
compressed_buffer.resize(header_size + LZ4_COMPRESSBOUND(uncompressed_size));
compressed_buffer[0] = 0x82; /// Второй бит - для совместимости с QuickLZ - обозначает, что размеры записываются 4 байтами.
if (method == CompressionMethod::LZ4)
compressed_size = QUICKLZ_HEADER_SIZE + LZ4_compress(
compressed_size = header_size + LZ4_compress(
working_buffer.begin(),
&compressed_buffer[QUICKLZ_HEADER_SIZE],
&compressed_buffer[header_size],
uncompressed_size);
else
compressed_size = QUICKLZ_HEADER_SIZE + LZ4_compressHC(
compressed_size = header_size + LZ4_compressHC(
working_buffer.begin(),
&compressed_buffer[QUICKLZ_HEADER_SIZE],
&compressed_buffer[header_size],
uncompressed_size);
UInt32 compressed_size_32 = compressed_size;