From e6f627bbc61819f236ca806baf7ccbfccb082d90 Mon Sep 17 00:00:00 2001 From: bharatnc Date: Tue, 6 Jul 2021 16:56:33 -0700 Subject: [PATCH] LZ4 - decompress read initial implementation --- src/IO/Lz4InflatingReadBuffer.cpp | 70 ++++++++++++++++++++++++++++--- src/IO/Lz4InflatingReadBuffer.h | 18 ++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/src/IO/Lz4InflatingReadBuffer.cpp b/src/IO/Lz4InflatingReadBuffer.cpp index 722b18dee1d..57deef46a3a 100644 --- a/src/IO/Lz4InflatingReadBuffer.cpp +++ b/src/IO/Lz4InflatingReadBuffer.cpp @@ -4,24 +4,84 @@ namespace DB { namespace ErrorCodes { -extern const int LZ4_DECODER_FAILED; + extern const int LZ4_DECODER_FAILED; } Lz4InflatingReadBuffer::Lz4InflatingReadBuffer(std::unique_ptr in_, size_t buf_size, char * existing_memory, size_t alignment) : BufferWithOwnMemory(buf_size, existing_memory, alignment), in(std::move(in_)) { - /// TODO: Implementation + ret = 1; + + + dctx_status = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION); + if (LZ4F_isError(dctx_status)) + { + throw Exception( + ErrorCodes::LZ4_DECODER_FAILED, + "LZ4 failed create decompression context LZ4F_dctx. LZ4F version: {}. Error: {}", + LZ4F_VERSION, + LZ4F_getErrorName(dctx_status), + ErrorCodes::LZ4_DECODER_FAILED); + } } Lz4InflatingReadBuffer::~Lz4InflatingReadBuffer() { - /// TODO: Implementation + LZ4F_freeDecompressionContext(dctx); } bool Lz4InflatingReadBuffer::nextImpl() { - /// TODO: Implementation + if (eof) + return false; + + + if (!in_available) + { + in->nextIfAtEnd(); + in_available = in->buffer().end() - in->position(); + src = reinterpret_cast(in->position()); + } + + + src_capacity = internal_buffer.size(); + dst = reinterpret_cast(internal_buffer.begin()); + + + LZ4F_frameInfo_t info; + size_t consumed_size = src_capacity; + { size_t const frame_info_res = LZ4F_getFrameInfo(dctx, &info, &src, &consumed_size); + if (LZ4F_isError(frame_info_res)) { + throw Exception( + ErrorCodes::LZ4_DECODER_FAILED, + "LZ4 failed to fetch get info LZ4F_getFrameInfo. LZ4F version: {}. Error: {}", + LZ4F_VERSION, + LZ4F_getErrorName(frame_info_res), + ErrorCodes::LZ4_DECODER_FAILED); + } + } + + ret = LZ4F_decompress(dctx, dst, &dst_capacity, &src, &src_capacity, /* LZ4F_decompressOptions_t */ nullptr); + if (LZ4F_isError(ret)) { + printf("Decompression error: %s\n", LZ4F_getErrorName(ret)); + throw Exception( + ErrorCodes::LZ4_DECODER_FAILED, + "LZ4 failed to fetch get info LZ4F_getFrameInfo. LZ4F version: {}. Error: {}", + LZ4F_VERSION, + LZ4F_getErrorName(ret), + ErrorCodes::LZ4_DECODER_FAILED); + } + + in->position() = in->buffer().end() - in_available; + working_buffer.resize(internal_buffer.size() - src_capacity); + + + if (in->eof()) + { + eof = true; + return !working_buffer.empty(); + } + return true; } - } diff --git a/src/IO/Lz4InflatingReadBuffer.h b/src/IO/Lz4InflatingReadBuffer.h index e87dcb740eb..1282e276717 100644 --- a/src/IO/Lz4InflatingReadBuffer.h +++ b/src/IO/Lz4InflatingReadBuffer.h @@ -5,6 +5,7 @@ #include #include +#include namespace DB @@ -28,6 +29,23 @@ private: bool nextImpl() override; std::unique_ptr in; + + size_t ret; + + void * src; + void * dst; + + size_t src_capacity; + size_t dst_capacity; + + size_t in_available; + + LZ4F_dctx* dctx; + size_t dctx_status; + + + bool eof = false; + }; }