This commit is contained in:
Antonio Andelic 2023-08-07 12:21:50 +00:00
parent c7996d5453
commit f5597b1f8e
2 changed files with 76 additions and 41 deletions

View File

@ -42,13 +42,50 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
if (!offset())
return;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
{
addEmptyBlock();
first_write = false;
}
flush(ZSTD_e_flush);
try
{
bool ended = false;
do
{
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
first_write = false;
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
bool everything_was_flushed = compression_result == 0;
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
}
catch (...)
{
/// Do not try to write next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
@ -66,58 +103,58 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
}
else
{
finalizeBefore();
out->finalize();
finalizeAfter();
try
{
finalizeBefore();
out->finalize();
finalizeAfter();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
}
void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
{
/// Actually we can use ZSTD_e_flush here and add empty termination
/// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
/// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
/// but console zstd utility cannot.
flush(ZSTD_e_end);
}
next();
out->nextIfAtEnd();
void ZstdDeflatingAppendableWriteBuffer::flush(ZSTD_EndDirective mode)
{
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
try
{
bool ended = false;
do
{
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
/// Actually we can use ZSTD_e_flush here and add empty termination
/// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
/// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
/// but console zstd utility cannot.
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
while (remaining != 0)
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream encoder end failed: error: '{}' ZSTD version: {}",
ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
out->position() = out->buffer().begin() + output.pos;
if (!out->hasPendingData())
{
out->next();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
bool everything_was_flushed = compression_result == 0;
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
}
catch (...)
{
/// Do not try to write next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
}

View File

@ -52,8 +52,6 @@ private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override;
void flush(ZSTD_EndDirective mode);
/// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it
/// should be almost noop, because frame epilogue contains only checksums,
/// and they are disabled for this buffer.