refactoring bzip2 buffer and fix possible data loss in internal buffer

This commit is contained in:
Yakov Olkhovskiy 2024-01-17 01:55:38 +00:00
parent 5da897ac20
commit e06e4695e1
2 changed files with 46 additions and 51 deletions

View File

@ -9,29 +9,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BZIP2_STREAM_ENCODER_FAILED;
}
Bzip2WriteBuffer::Bzip2StateWrapper::Bzip2StateWrapper(int compression_level)
{
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
Bzip2WriteBuffer::Bzip2StateWrapper::~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
Bzip2WriteBuffer::~Bzip2WriteBuffer() = default;
@ -42,20 +19,20 @@ void Bzip2WriteBuffer::nextImpl()
return;
}
bz->stream.next_in = working_buffer.begin();
bz->stream.avail_in = static_cast<unsigned>(offset());
stream.next_in = working_buffer.begin();
stream.avail_in = static_cast<unsigned>(offset());
try
{
do
{
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());
stream.next_out = out->position();
stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());
int ret = BZ2_bzCompress(&bz->stream, BZ_RUN);
int ret = BZ2_bzCompress(&stream, BZ_RUN);
out->position() = out->buffer().end() - bz->stream.avail_out;
out->position() = out->buffer().end() - stream.avail_out;
if (ret != BZ_RUN_OK)
throw Exception(
@ -64,7 +41,7 @@ void Bzip2WriteBuffer::nextImpl()
ret);
}
while (bz->stream.avail_in > 0);
while (stream.avail_in > 0);
total_in += offset();
}
@ -84,19 +61,31 @@ void Bzip2WriteBuffer::finalizeBefore()
if (!compress_empty && total_in == 0)
return;
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());
do
{
out->nextIfAtEnd();
stream.next_out = out->position();
stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());
int ret = BZ2_bzCompress(&bz->stream, BZ_FINISH);
int ret = BZ2_bzCompress(&stream, BZ_FINISH);
out->position() = out->buffer().end() - bz->stream.avail_out;
out->position() = out->buffer().end() - stream.avail_out;
if (ret != BZ_STREAM_END && ret != BZ_FINISH_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder failed: error code: {}",
ret);
if (ret == BZ_STREAM_END)
break;
if (ret != BZ_FINISH_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder failed: error code: {}",
ret);
}
while (true);
}
void Bzip2WriteBuffer::finalizeAfter()
{
BZ2_bzCompressEnd(&stream);
}
}

View File

@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BZIP2_STREAM_ENCODER_FAILED;
}
class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
@ -23,9 +28,18 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique<Bzip2StateWrapper>(compression_level))
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, compress_empty(compress_empty_)
{
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2WriteBuffer() override;
@ -34,17 +48,9 @@ private:
void nextImpl() override;
void finalizeBefore() override;
void finalizeAfter() override;
class Bzip2StateWrapper
{
public:
explicit Bzip2StateWrapper(int compression_level);
~Bzip2StateWrapper();
bz_stream stream;
};
std::unique_ptr<Bzip2StateWrapper> bz;
bz_stream stream;
bool compress_empty = true;
UInt64 total_in = 0;
};