mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
Refactor WriteBuffer
This commit is contained in:
parent
24046d6ecc
commit
ecb985c4b5
@ -1,4 +1,4 @@
|
||||
#include "Lz4DeflatingWriteBuffer.h"
|
||||
#include <IO/Lz4DeflatingWriteBuffer.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
|
||||
@ -14,12 +14,11 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
|
||||
std::unique_ptr<WriteBuffer> out_, int /*compression_level*/, size_t buf_size, char * existing_memory, size_t alignment)
|
||||
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
|
||||
, out(std::move(out_))
|
||||
, in_chunk_size(0)
|
||||
, in_data(nullptr)
|
||||
, out_data(nullptr)
|
||||
, in_capacity(0)
|
||||
, out_capacity(0)
|
||||
, count_in(0)
|
||||
, count_out(0)
|
||||
{
|
||||
count_in = 0;
|
||||
kPrefs = {
|
||||
{LZ4F_max256KB,
|
||||
LZ4F_blockLinked,
|
||||
@ -33,15 +32,15 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
|
||||
0, /* favor decompression speed */
|
||||
{0, 0, 0}, /* reserved, must be set to 0 */
|
||||
};
|
||||
compression_ctx = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(compression_ctx))
|
||||
{
|
||||
|
||||
size_t ret = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||
|
||||
if (LZ4F_isError(ret))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"creation of LZ$ compression context failed. LZ4F version: {}",
|
||||
"creation of LZ4 compression context failed. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
|
||||
@ -56,61 +55,66 @@ void Lz4DeflatingWriteBuffer::nextImpl()
|
||||
if (!offset())
|
||||
return;
|
||||
|
||||
in_buff = reinterpret_cast<void *>(working_buffer.begin());
|
||||
in_chunk_size = offset();
|
||||
|
||||
in_data = reinterpret_cast<void *>(working_buffer.begin());
|
||||
in_capacity = offset();
|
||||
|
||||
try
|
||||
{
|
||||
out->nextIfAtEnd();
|
||||
|
||||
out_buff = reinterpret_cast<void *>(out->position());
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
|
||||
/// write frame header and check for errors
|
||||
|
||||
size_t const header_size = LZ4F_compressBegin(ctx, out_buff, out_capacity, &kPrefs);
|
||||
|
||||
if (LZ4F_isError(header_size))
|
||||
if (first_time)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to start stream encoding. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
out->nextIfAtEnd();
|
||||
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
|
||||
/// write frame header and check for errors
|
||||
|
||||
size_t header_size = LZ4F_compressBegin(ctx, out_data, out_capacity, &kPrefs);
|
||||
|
||||
if (LZ4F_isError(header_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to start stream encoding. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
|
||||
out_capacity -= header_size;
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
first_time = false;
|
||||
}
|
||||
count_out = header_size;
|
||||
out->position() = out->buffer().begin() + count_out;
|
||||
|
||||
do
|
||||
{
|
||||
out->nextIfAtEnd();
|
||||
|
||||
out_buff = reinterpret_cast<void *>(out->position());
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
|
||||
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
|
||||
size_t cur_buffer_size = in_capacity;
|
||||
while (out_capacity < LZ4F_compressBound(cur_buffer_size, &kPrefs))
|
||||
cur_buffer_size /= 2;
|
||||
|
||||
/// compress begin
|
||||
{
|
||||
const size_t compressed_size = LZ4F_compressUpdate(ctx, out_buff, out_capacity, in_buff, in_chunk_size, nullptr);
|
||||
size_t compressed_size = LZ4F_compressUpdate(ctx, out_data, out_capacity, in_data, cur_buffer_size, nullptr);
|
||||
|
||||
if (LZ4F_isError(compressed_size))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to encode stream. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
}
|
||||
count_out += compressed_size;
|
||||
}
|
||||
out->position() = out->buffer().begin() + count_out;
|
||||
if (LZ4F_isError(compressed_size))
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to encode stream. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
|
||||
} while (count_out < count_in);
|
||||
out_capacity -= compressed_size;
|
||||
in_capacity -= cur_buffer_size;
|
||||
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
}
|
||||
while (in_capacity > 0);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
out->position() = out->buffer().begin();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,21 +141,22 @@ void Lz4DeflatingWriteBuffer::finish()
|
||||
void Lz4DeflatingWriteBuffer::finishImpl()
|
||||
{
|
||||
next();
|
||||
out->nextIfAtEnd();
|
||||
|
||||
out_data = reinterpret_cast<void *>(out->position());
|
||||
out_capacity = out->buffer().end() - out->position();
|
||||
|
||||
/// compression end
|
||||
const size_t end_size = LZ4F_compressEnd(ctx, out_buff, out_capacity, nullptr);
|
||||
size_t end_size = LZ4F_compressEnd(ctx, out_data, out_capacity, nullptr);
|
||||
|
||||
if (LZ4F_isError(end_size))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||
"LZ4 failed to end stream encoding. LZ4F version: {}",
|
||||
LZ4F_VERSION,
|
||||
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||
}
|
||||
count_out += end_size;
|
||||
out->position() = out->buffer().begin() + count_out;
|
||||
|
||||
out_capacity -= end_size;
|
||||
out->position() = out->buffer().end() - out_capacity;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -32,21 +32,16 @@ private:
|
||||
|
||||
std::unique_ptr<WriteBuffer> out;
|
||||
|
||||
bool finished = false;
|
||||
|
||||
|
||||
void * in_buff;
|
||||
void * out_buff;
|
||||
size_t in_chunk_size;
|
||||
size_t out_capacity;
|
||||
|
||||
LZ4F_preferences_t kPrefs;
|
||||
LZ4F_compressionContext_t ctx;
|
||||
|
||||
size_t compression_ctx;
|
||||
void * in_data;
|
||||
void * out_data;
|
||||
|
||||
uint64_t count_in;
|
||||
uint64_t count_out;
|
||||
size_t in_capacity;
|
||||
size_t out_capacity;
|
||||
|
||||
LZ4F_preferences_t kPrefs;
|
||||
bool first_time = true;
|
||||
bool finished = false;
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user