2014-01-15 14:53:20 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
|
|
#include <city.h>
|
2016-02-03 21:16:19 +00:00
|
|
|
|
|
|
|
|
|
#ifdef USE_QUICKLZ
|
|
|
|
|
#include <quicklz/quicklz_level1.h>
|
|
|
|
|
#endif
|
|
|
|
|
|
2014-01-15 14:53:20 +00:00
|
|
|
|
#include <lz4/lz4.h>
|
2015-03-09 01:15:43 +00:00
|
|
|
|
#include <zstd/zstd.h>
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
|
|
|
|
#include <DB/Common/PODArray.h>
|
|
|
|
|
#include <DB/Common/ProfileEvents.h>
|
2015-10-05 01:35:28 +00:00
|
|
|
|
#include <DB/Common/Exception.h>
|
2014-01-15 14:53:20 +00:00
|
|
|
|
#include <DB/IO/ReadBuffer.h>
|
|
|
|
|
#include <DB/IO/BufferWithOwnMemory.h>
|
|
|
|
|
#include <DB/IO/CompressedStream.h>
|
2014-10-05 21:59:23 +00:00
|
|
|
|
#include <DB/IO/WriteHelpers.h>
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int UNKNOWN_COMPRESSION_METHOD;
|
|
|
|
|
extern const int TOO_LARGE_SIZE_COMPRESSED;
|
|
|
|
|
extern const int CHECKSUM_DOESNT_MATCH;
|
|
|
|
|
extern const int CANNOT_DECOMPRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-01-15 14:53:20 +00:00
|
|
|
|
class CompressedReadBufferBase
|
|
|
|
|
{
|
|
|
|
|
protected:
|
|
|
|
|
ReadBuffer * compressed_in;
|
|
|
|
|
|
|
|
|
|
/// Если в буфере compressed_in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer.
|
2016-02-03 21:16:19 +00:00
|
|
|
|
PODArray<char> own_compressed_buffer{COMPRESSED_BLOCK_HEADER_SIZE};
|
2014-04-08 07:31:51 +00:00
|
|
|
|
char * compressed_buffer = nullptr;
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#ifdef USE_QUICKLZ
|
2016-06-09 04:37:21 +00:00
|
|
|
|
std::unique_ptr<qlz_state_decompress> qlz_state;
|
2016-02-21 13:57:03 +00:00
|
|
|
|
#else
|
2016-02-24 07:53:04 +00:00
|
|
|
|
void * fixed_size_padding = nullptr;
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#endif
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
2016-04-09 23:24:38 +00:00
|
|
|
|
/// Не проверять чексуммы.
|
|
|
|
|
bool disable_checksum = false;
|
|
|
|
|
|
|
|
|
|
|
2014-01-15 14:53:20 +00:00
|
|
|
|
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
|
|
|
|
|
/// Возвращает количество прочитанных байт.
|
2015-03-09 01:15:43 +00:00
|
|
|
|
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
|
|
|
|
if (compressed_in->eof())
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
uint128 checksum;
|
|
|
|
|
compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
|
|
|
|
|
|
2016-02-03 21:16:19 +00:00
|
|
|
|
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
|
|
|
|
|
compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE);
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
2014-10-05 21:59:23 +00:00
|
|
|
|
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
|
2015-03-09 01:15:43 +00:00
|
|
|
|
|
|
|
|
|
size_t & size_compressed = size_compressed_without_checksum;
|
2014-10-05 21:59:23 +00:00
|
|
|
|
|
2014-10-23 22:19:57 +00:00
|
|
|
|
if (method < 0x80)
|
2014-10-05 21:59:23 +00:00
|
|
|
|
{
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#ifdef USE_QUICKLZ
|
2014-10-05 21:59:23 +00:00
|
|
|
|
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
|
|
|
|
|
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#else
|
|
|
|
|
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
|
|
|
|
|
#endif
|
2014-10-05 21:59:23 +00:00
|
|
|
|
}
|
2015-03-09 01:15:43 +00:00
|
|
|
|
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
|
2014-10-05 21:59:23 +00:00
|
|
|
|
{
|
|
|
|
|
size_compressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[1]);
|
|
|
|
|
size_decompressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[5]);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
|
|
|
|
|
|
2014-01-15 14:53:20 +00:00
|
|
|
|
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
|
|
|
|
|
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
|
|
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
|
|
|
|
|
|
|
|
|
|
/// Находится ли сжатый блок целиком в буфере compressed_in?
|
2016-02-03 21:16:19 +00:00
|
|
|
|
if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
|
|
|
|
|
compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
2016-02-03 21:16:19 +00:00
|
|
|
|
compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE;
|
2014-01-15 14:53:20 +00:00
|
|
|
|
compressed_buffer = compressed_in->position();
|
|
|
|
|
compressed_in->position() += size_compressed;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
own_compressed_buffer.resize(size_compressed);
|
|
|
|
|
compressed_buffer = &own_compressed_buffer[0];
|
2016-02-03 21:16:19 +00:00
|
|
|
|
compressed_in->readStrict(&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
|
2014-01-15 14:53:20 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-04-09 23:24:38 +00:00
|
|
|
|
if (!disable_checksum && checksum != CityHash128(&compressed_buffer[0], size_compressed))
|
2014-01-15 14:53:20 +00:00
|
|
|
|
throw Exception("Checksum doesn't match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
|
|
|
|
|
|
|
|
|
return size_compressed + sizeof(checksum);
|
|
|
|
|
}
|
|
|
|
|
|
2015-03-09 01:15:43 +00:00
|
|
|
|
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
|
|
|
|
|
|
2014-10-05 21:59:23 +00:00
|
|
|
|
UInt8 method = compressed_buffer[0]; /// См. CompressedWriteBuffer.h
|
|
|
|
|
|
2014-10-23 22:19:57 +00:00
|
|
|
|
if (method < 0x80)
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#ifdef USE_QUICKLZ
|
2014-01-15 14:53:20 +00:00
|
|
|
|
if (!qlz_state)
|
2016-06-09 04:37:21 +00:00
|
|
|
|
qlz_state = std::make_unique<qlz_state_decompress>();
|
2014-01-15 14:53:20 +00:00
|
|
|
|
|
2016-06-16 08:33:49 +00:00
|
|
|
|
qlz_decompress(&compressed_buffer[0], to, qlz_state.get());
|
2016-02-03 21:16:19 +00:00
|
|
|
|
#else
|
|
|
|
|
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
|
|
|
|
|
#endif
|
2014-01-15 14:53:20 +00:00
|
|
|
|
}
|
2015-03-09 01:15:43 +00:00
|
|
|
|
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
|
2014-10-05 21:59:23 +00:00
|
|
|
|
{
|
2016-02-03 21:16:19 +00:00
|
|
|
|
if (LZ4_decompress_fast(&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], to, size_decompressed) < 0)
|
2015-03-09 01:15:43 +00:00
|
|
|
|
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
|
|
|
|
|
}
|
|
|
|
|
else if (method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
|
|
|
|
|
{
|
|
|
|
|
size_t res = ZSTD_decompress(
|
|
|
|
|
to, size_decompressed,
|
2016-02-03 21:16:19 +00:00
|
|
|
|
&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE);
|
2015-03-09 01:15:43 +00:00
|
|
|
|
|
|
|
|
|
if (ZSTD_isError(res))
|
|
|
|
|
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
|
2014-10-05 21:59:23 +00:00
|
|
|
|
}
|
2014-01-15 14:53:20 +00:00
|
|
|
|
else
|
2014-10-05 21:59:23 +00:00
|
|
|
|
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
|
2014-01-15 14:53:20 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
/// compressed_in можно инициализировать отложенно, но до первого вызова readCompressedData.
|
2014-04-08 07:31:51 +00:00
|
|
|
|
CompressedReadBufferBase(ReadBuffer * in = nullptr)
|
|
|
|
|
: compressed_in(in)
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-09 23:24:38 +00:00
|
|
|
|
/** Не проверять чексуммы.
|
|
|
|
|
* Может использоваться, например, в тех случаях, когда сжатые данные пишет клиент,
|
|
|
|
|
* который не умеет вычислять чексуммы, и вместо этого заполняет их нулями или чем угодно.
|
|
|
|
|
*/
|
|
|
|
|
void disableChecksumming()
|
|
|
|
|
{
|
|
|
|
|
disable_checksum = true;
|
|
|
|
|
}
|
2014-01-15 14:53:20 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|