mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
LZ4 - compress write initial implementation
This commit is contained in:
parent
8cb5c06813
commit
c79f885e41
@ -14,26 +14,120 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
|
|||||||
std::unique_ptr<WriteBuffer> out_, int /*compression_level*/, size_t buf_size, char * existing_memory, size_t alignment)
|
std::unique_ptr<WriteBuffer> out_, int /*compression_level*/, size_t buf_size, char * existing_memory, size_t alignment)
|
||||||
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
|
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
|
||||||
{
|
{
|
||||||
|
count_in = 0;
|
||||||
|
compression_ctx = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||||
|
if (LZ4F_isError(compression_ctx))
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||||
|
"creation of LZ$ compression context failed. LZ4F version: {}",
|
||||||
|
LZ4F_VERSION,
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
|
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
|
||||||
{
|
{
|
||||||
/// TODO: Implementation
|
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
|
||||||
|
finish();
|
||||||
|
LZ4F_freeCompressionContext(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Lz4DeflatingWriteBuffer::nextImpl()
|
void Lz4DeflatingWriteBuffer::nextImpl()
|
||||||
{
|
{
|
||||||
/// TODO: Implementation
|
if (!offset())
|
||||||
|
return;
|
||||||
|
|
||||||
|
in_buff = reinterpret_cast<void *>(working_buffer.begin());
|
||||||
|
in_chunk_size = offset();
|
||||||
|
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
do
|
||||||
|
{
|
||||||
|
out->nextIfAtEnd();
|
||||||
|
|
||||||
|
out_buff = reinterpret_cast<void *>(out->buffer().begin());
|
||||||
|
out_capacity = out->buffer().end() - out->position();
|
||||||
|
|
||||||
|
/// write frame header and check for errors
|
||||||
|
{
|
||||||
|
size_t const header_size = LZ4F_compressBegin(ctx, out_buff, out_capacity, nullptr);
|
||||||
|
|
||||||
|
if (LZ4F_isError(header_size))
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||||
|
"LZ4 failed to start stream encoding. LZ4F version: {}",
|
||||||
|
LZ4F_VERSION,
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||||
|
}
|
||||||
|
count_out = header_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// compress begin
|
||||||
|
{
|
||||||
|
const size_t compressed_size = LZ4F_compressUpdate(ctx, out_buff, out_capacity, in_buff, in_chunk_size, nullptr);
|
||||||
|
|
||||||
|
if (LZ4F_isError(compressed_size))
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||||
|
"LZ4 failed to encode stream. LZ4F version: {}",
|
||||||
|
LZ4F_VERSION,
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||||
|
}
|
||||||
|
count_out += compressed_size;
|
||||||
|
}
|
||||||
|
out->position() = out->buffer().end() - count_out;
|
||||||
|
} while (count_out < count_in);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
out->position() = out->buffer().begin();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Lz4DeflatingWriteBuffer::finish()
|
void Lz4DeflatingWriteBuffer::finish()
|
||||||
{
|
{
|
||||||
/// TODO: Implementation
|
if (finished)
|
||||||
|
return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
finishImpl();
|
||||||
|
out->finalize();
|
||||||
|
finished = true;
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
/// Do not try to flush next time after exception.
|
||||||
|
out->position() = out->buffer().begin();
|
||||||
|
finished = true;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Lz4DeflatingWriteBuffer::finishImpl()
|
void Lz4DeflatingWriteBuffer::finishImpl()
|
||||||
{
|
{
|
||||||
/// TODO: Implementation
|
next();
|
||||||
|
out->nextIfAtEnd();
|
||||||
|
|
||||||
|
/// compression end
|
||||||
|
const size_t end_size = LZ4F_compressEnd(ctx, out_buff, out_capacity, nullptr);
|
||||||
|
|
||||||
|
if (LZ4F_isError(end_size))
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED,
|
||||||
|
"LZ4 failed to end stream encoding. LZ4F version: {}",
|
||||||
|
LZ4F_VERSION,
|
||||||
|
ErrorCodes::LZ4_ENCODER_FAILED);
|
||||||
|
}
|
||||||
|
count_out += end_size;
|
||||||
|
out->position() = out->buffer().end() - count_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,10 @@
|
|||||||
#include <IO/WriteBuffer.h>
|
#include <IO/WriteBuffer.h>
|
||||||
|
|
||||||
#include <lz4.h>
|
#include <lz4.h>
|
||||||
|
#include <lz4frame.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Performs compression using lz4 library and writes compressed data to out_ WriteBuffer.
|
/// Performs compression using lz4 library and writes compressed data to out_ WriteBuffer.
|
||||||
class Lz4DeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
|
class Lz4DeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
|
||||||
{
|
{
|
||||||
@ -32,6 +32,19 @@ private:
|
|||||||
|
|
||||||
std::unique_ptr<WriteBuffer> out;
|
std::unique_ptr<WriteBuffer> out;
|
||||||
|
|
||||||
/// TODO: Complete Implementation
|
bool finished = false;
|
||||||
|
|
||||||
|
|
||||||
|
void * in_buff;
|
||||||
|
void * out_buff;
|
||||||
|
size_t in_chunk_size;
|
||||||
|
size_t out_capacity;
|
||||||
|
|
||||||
|
LZ4F_compressionContext_t ctx;
|
||||||
|
|
||||||
|
size_t compression_ctx;
|
||||||
|
|
||||||
|
uint64_t count_in;
|
||||||
|
uint64_t count_out;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user