ClickHouse/dbms/src/IO/CompressedWriteBuffer.cpp

154 lines
4.7 KiB
C++
Raw Normal View History

2016-10-25 06:49:24 +00:00
#include <memory>
#include <city.h>
#include <lz4.h>
#include <lz4hc.h>
#include <zstd.h>
2017-07-28 14:14:07 +00:00
#include <string.h>
2016-10-25 06:49:24 +00:00
#include <common/unaligned.h>
2017-04-08 01:32:05 +00:00
#include <Core/Types.h>
2016-10-25 06:49:24 +00:00
#include <IO/CompressedWriteBuffer.h>
2016-10-25 06:49:24 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int UNKNOWN_COMPRESSION_METHOD;
2016-10-25 06:49:24 +00:00
}
void CompressedWriteBuffer::nextImpl()
{
if (!offset())
return;
size_t uncompressed_size = offset();
size_t compressed_size = 0;
char * compressed_buffer_ptr = nullptr;
/** The format of compressed block - see CompressedStream.h
*/
switch (compression_settings.method)
{
case CompressionMethod::LZ4:
case CompressionMethod::LZ4HC:
{
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
2016-10-25 06:49:24 +00:00
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
compressed_buffer.resize(header_size + LZ4_COMPRESSBOUND(uncompressed_size));
2016-10-25 06:49:24 +00:00
#pragma GCC diagnostic pop
compressed_buffer[0] = static_cast<char>(CompressionMethodByte::LZ4);
2016-10-25 06:49:24 +00:00
if (compression_settings.method == CompressionMethod::LZ4)
compressed_size = header_size + LZ4_compress_default(
working_buffer.begin(),
&compressed_buffer[header_size],
uncompressed_size,
LZ4_COMPRESSBOUND(uncompressed_size));
else
compressed_size = header_size + LZ4_compress_HC(
working_buffer.begin(),
&compressed_buffer[header_size],
uncompressed_size,
LZ4_COMPRESSBOUND(uncompressed_size),
compression_settings.level);
2016-10-25 06:49:24 +00:00
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
2016-10-25 06:49:24 +00:00
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
2016-10-25 06:49:24 +00:00
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
case CompressionMethod::ZSTD:
{
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
2016-10-25 06:49:24 +00:00
compressed_buffer.resize(header_size + ZSTD_compressBound(uncompressed_size));
2016-10-25 06:49:24 +00:00
compressed_buffer[0] = static_cast<char>(CompressionMethodByte::ZSTD);
2016-10-25 06:49:24 +00:00
size_t res = ZSTD_compress(
&compressed_buffer[header_size],
compressed_buffer.size() - header_size,
working_buffer.begin(),
uncompressed_size,
compression_settings.level);
2016-10-25 06:49:24 +00:00
if (ZSTD_isError(res))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS);
2016-10-25 06:49:24 +00:00
compressed_size = header_size + res;
2016-10-25 06:49:24 +00:00
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
2016-10-25 06:49:24 +00:00
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
2016-10-25 06:49:24 +00:00
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
2017-07-28 14:14:07 +00:00
case CompressionMethod::NONE:
{
static constexpr size_t header_size = 1 + sizeof (UInt32) + sizeof (UInt32);
compressed_size = header_size + uncompressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
UInt32 compressed_size_32 = compressed_size;
compressed_buffer.resize(compressed_size);
compressed_buffer[0] = static_cast<char>(CompressionMethodByte::NONE);
2017-07-28 14:14:07 +00:00
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
memcpy(&compressed_buffer[9], working_buffer.begin(), uncompressed_size);
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
default:
throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
2016-10-25 06:49:24 +00:00
2017-06-21 08:35:38 +00:00
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer_ptr, compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), sizeof(checksum));
2016-10-25 06:49:24 +00:00
out.write(compressed_buffer_ptr, compressed_size);
2016-10-25 06:49:24 +00:00
}
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionSettings compression_settings_,
size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), compression_settings(compression_settings_)
2016-10-25 06:49:24 +00:00
{
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2016-10-25 06:49:24 +00:00
}
}