ClickHouse/src/IO/Lz4DeflatingWriteBuffer.cpp

166 lines
5.1 KiB
C++
Raw Normal View History

2021-08-19 10:34:23 +00:00
#include <IO/Lz4DeflatingWriteBuffer.h>
2021-06-16 05:43:07 +00:00
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LZ4_ENCODER_FAILED;
}
Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
2021-08-20 16:40:18 +00:00
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
2021-11-10 22:58:56 +00:00
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
2021-08-19 10:34:23 +00:00
, in_data(nullptr)
, out_data(nullptr)
, in_capacity(0)
2021-08-19 08:10:56 +00:00
, out_capacity(0)
2021-06-16 05:43:07 +00:00
{
2021-07-24 16:29:35 +00:00
kPrefs = {
{LZ4F_max256KB,
LZ4F_blockLinked,
LZ4F_noContentChecksum,
LZ4F_frame,
0 /* unknown content size */,
0 /* no dictID */,
LZ4F_noBlockChecksum},
2021-08-20 16:40:18 +00:00
compression_level, /* compression level; 0 == default */
2021-11-26 12:03:46 +00:00
1, /* autoflush */
2021-07-24 16:29:35 +00:00
0, /* favor decompression speed */
{0, 0, 0}, /* reserved, must be set to 0 */
};
2021-08-19 10:34:23 +00:00
size_t ret = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(ret))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
2021-08-19 10:34:23 +00:00
"creation of LZ4 compression context failed. LZ4F version: {}",
LZ4F_VERSION,
ErrorCodes::LZ4_ENCODER_FAILED);
2021-06-16 05:43:07 +00:00
}
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
{
2021-11-11 17:27:23 +00:00
finalize();
2021-06-16 05:43:07 +00:00
}
void Lz4DeflatingWriteBuffer::nextImpl()
{
if (!offset())
return;
2021-08-19 10:34:23 +00:00
in_data = reinterpret_cast<void *>(working_buffer.begin());
in_capacity = offset();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
try
{
2021-08-19 10:34:23 +00:00
if (first_time)
{
if (out_capacity < LZ4F_HEADER_SIZE_MAX)
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
}
2021-07-24 16:29:35 +00:00
2021-08-19 10:34:23 +00:00
/// write frame header and check for errors
size_t header_size = LZ4F_compressBegin(ctx, out_data, out_capacity, &kPrefs);
2021-07-24 16:29:35 +00:00
2021-08-19 10:34:23 +00:00
if (LZ4F_isError(header_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to start stream encoding. LZ4F version: {}",
2021-08-20 16:40:18 +00:00
LZ4F_VERSION);
2021-08-19 10:34:23 +00:00
out_capacity -= header_size;
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
2021-08-19 10:34:23 +00:00
first_time = false;
2021-07-24 16:29:35 +00:00
}
do
{
2021-08-20 16:40:18 +00:00
/// Ensure that there is enough space for compressed block of minimal size
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
if (out_capacity < min_compressed_block_size)
2021-08-20 16:40:18 +00:00
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
2021-08-20 16:40:18 +00:00
}
2021-08-19 10:34:23 +00:00
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
size_t cur_buffer_size = in_capacity;
if (out_capacity >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small.
{
while (out_capacity < LZ4F_compressBound(cur_buffer_size, &kPrefs))
cur_buffer_size /= 2;
}
2021-08-19 10:34:23 +00:00
size_t compressed_size = LZ4F_compressUpdate(ctx, out_data, out_capacity, in_data, cur_buffer_size, nullptr);
2021-08-19 10:34:23 +00:00
if (LZ4F_isError(compressed_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to encode stream. LZ4F version: {}",
2021-08-20 16:40:18 +00:00
LZ4F_VERSION);
2021-08-19 10:34:23 +00:00
in_capacity -= cur_buffer_size;
2021-08-20 16:40:18 +00:00
in_data = reinterpret_cast<void *>(working_buffer.end() - in_capacity);
out_capacity -= compressed_size;
2021-08-19 10:34:23 +00:00
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
2021-08-19 10:34:23 +00:00
}
while (in_capacity > 0);
}
catch (...)
{
out->position() = out->buffer().begin();
2021-08-19 10:34:23 +00:00
throw;
}
2021-11-26 12:03:46 +00:00
out->next();
out_capacity = out->buffer().end() - out->position();
2021-06-16 05:43:07 +00:00
}
2021-11-22 11:19:26 +00:00
void Lz4DeflatingWriteBuffer::finalizeBefore()
2021-06-16 05:43:07 +00:00
{
next();
2021-08-19 10:34:23 +00:00
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
2021-08-25 12:35:53 +00:00
if (out_capacity < LZ4F_compressBound(0, &kPrefs))
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
2021-08-25 12:35:53 +00:00
}
/// compression end
2021-08-19 10:34:23 +00:00
size_t end_size = LZ4F_compressEnd(ctx, out_data, out_capacity, nullptr);
if (LZ4F_isError(end_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to end stream encoding. LZ4F version: {}",
2021-08-20 16:40:18 +00:00
LZ4F_VERSION);
2021-08-19 10:34:23 +00:00
out_capacity -= end_size;
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
2021-06-16 05:43:07 +00:00
}
2021-11-22 11:19:26 +00:00
void Lz4DeflatingWriteBuffer::finalizeAfter()
2021-11-10 22:58:56 +00:00
{
LZ4F_freeCompressionContext(ctx);
}
2021-06-16 05:43:07 +00:00
}