From e06e4695e199fd52c3f09fd93bb49b2b313595b5 Mon Sep 17 00:00:00 2001 From: Yakov Olkhovskiy Date: Wed, 17 Jan 2024 01:55:38 +0000 Subject: [PATCH] refactoring bzip2 buffer and fix possible data loss in internal buffer --- src/IO/Bzip2WriteBuffer.cpp | 69 ++++++++++++++++--------------------- src/IO/Bzip2WriteBuffer.h | 28 +++++++++------ 2 files changed, 46 insertions(+), 51 deletions(-) diff --git a/src/IO/Bzip2WriteBuffer.cpp b/src/IO/Bzip2WriteBuffer.cpp index 3421b4c3985..fffb1287041 100644 --- a/src/IO/Bzip2WriteBuffer.cpp +++ b/src/IO/Bzip2WriteBuffer.cpp @@ -9,29 +9,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int BZIP2_STREAM_ENCODER_FAILED; -} - - -Bzip2WriteBuffer::Bzip2StateWrapper::Bzip2StateWrapper(int compression_level) -{ - memset(&stream, 0, sizeof(stream)); - - int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0); - - if (ret != BZ_OK) - throw Exception( - ErrorCodes::BZIP2_STREAM_ENCODER_FAILED, - "bzip2 stream encoder init failed: error code: {}", - ret); -} - -Bzip2WriteBuffer::Bzip2StateWrapper::~Bzip2StateWrapper() -{ - BZ2_bzCompressEnd(&stream); -} Bzip2WriteBuffer::~Bzip2WriteBuffer() = default; @@ -42,20 +19,20 @@ void Bzip2WriteBuffer::nextImpl() return; } - bz->stream.next_in = working_buffer.begin(); - bz->stream.avail_in = static_cast(offset()); + stream.next_in = working_buffer.begin(); + stream.avail_in = static_cast(offset()); try { do { out->nextIfAtEnd(); - bz->stream.next_out = out->position(); - bz->stream.avail_out = static_cast(out->buffer().end() - out->position()); + stream.next_out = out->position(); + stream.avail_out = static_cast(out->buffer().end() - out->position()); - int ret = BZ2_bzCompress(&bz->stream, BZ_RUN); + int ret = BZ2_bzCompress(&stream, BZ_RUN); - out->position() = out->buffer().end() - bz->stream.avail_out; + out->position() = out->buffer().end() - stream.avail_out; if (ret != BZ_RUN_OK) throw Exception( @@ -64,7 +41,7 @@ void Bzip2WriteBuffer::nextImpl() ret); } - while (bz->stream.avail_in > 0); + while (stream.avail_in > 0); total_in += offset(); } @@ -84,19 +61,31 @@ void Bzip2WriteBuffer::finalizeBefore() if (!compress_empty && total_in == 0) return; - out->nextIfAtEnd(); - bz->stream.next_out = out->position(); - bz->stream.avail_out = static_cast(out->buffer().end() - out->position()); + do + { + out->nextIfAtEnd(); + stream.next_out = out->position(); + stream.avail_out = static_cast(out->buffer().end() - out->position()); - int ret = BZ2_bzCompress(&bz->stream, BZ_FINISH); + int ret = BZ2_bzCompress(&stream, BZ_FINISH); - out->position() = out->buffer().end() - bz->stream.avail_out; + out->position() = out->buffer().end() - stream.avail_out; - if (ret != BZ_STREAM_END && ret != BZ_FINISH_OK) - throw Exception( - ErrorCodes::BZIP2_STREAM_ENCODER_FAILED, - "bzip2 stream encoder failed: error code: {}", - ret); + if (ret == BZ_STREAM_END) + break; + + if (ret != BZ_FINISH_OK) + throw Exception( + ErrorCodes::BZIP2_STREAM_ENCODER_FAILED, + "bzip2 stream encoder failed: error code: {}", + ret); + } + while (true); +} + +void Bzip2WriteBuffer::finalizeAfter() +{ + BZ2_bzCompressEnd(&stream); } } diff --git a/src/IO/Bzip2WriteBuffer.h b/src/IO/Bzip2WriteBuffer.h index 63c67461c6a..8e6d3d7ed1a 100644 --- a/src/IO/Bzip2WriteBuffer.h +++ b/src/IO/Bzip2WriteBuffer.h @@ -12,6 +12,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int BZIP2_STREAM_ENCODER_FAILED; +} + class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator { public: @@ -23,9 +28,18 @@ public: char * existing_memory = nullptr, size_t alignment = 0, bool compress_empty_ = true) - : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique(compression_level)) + : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) , compress_empty(compress_empty_) { + memset(&stream, 0, sizeof(stream)); + + int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0); + + if (ret != BZ_OK) + throw Exception( + ErrorCodes::BZIP2_STREAM_ENCODER_FAILED, + "bzip2 stream encoder init failed: error code: {}", + ret); } ~Bzip2WriteBuffer() override; @@ -34,17 +48,9 @@ private: void nextImpl() override; void finalizeBefore() override; + void finalizeAfter() override; - class Bzip2StateWrapper - { - public: - explicit Bzip2StateWrapper(int compression_level); - ~Bzip2StateWrapper(); - - bz_stream stream; - }; - - std::unique_ptr bz; + bz_stream stream; bool compress_empty = true; UInt64 total_in = 0; };