Fix ZstdDeflatingWriteBuffer truncating the output sometimes

This commit is contained in:
Michael Kolupaev 2023-08-04 20:42:02 +00:00
parent 166ac7a8be
commit 9a128fa554
6 changed files with 54 additions and 99 deletions

View File

@ -42,50 +42,13 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
if (!offset())
return;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
{
addEmptyBlock();
first_write = false;
}
try
{
bool ended = false;
do
{
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
first_write = false;
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
bool everything_was_flushed = compression_result == 0;
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
}
catch (...)
{
/// Do not try to write next time after exception.
out->position() = out->buffer().begin();
throw;
}
flush(ZSTD_e_flush);
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
@ -103,58 +66,58 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
}
else
{
try
{
finalizeBefore();
out->finalize();
finalizeAfter();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
finalizeBefore();
out->finalize();
finalizeAfter();
}
}
void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
{
next();
out->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
/// Actually we can use ZSTD_e_flush here and add empty termination
/// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock).
/// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer),
/// but console zstd utility cannot.
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
while (remaining != 0)
flush(ZSTD_e_end);
}
void ZstdDeflatingAppendableWriteBuffer::flush(ZSTD_EndDirective mode)
{
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
try
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream encoder end failed: error: '{}' ZSTD version: {}",
ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
out->position() = out->buffer().begin() + output.pos;
if (!out->hasPendingData())
bool ended = false;
do
{
out->next();
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
}
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
bool everything_was_flushed = compression_result == 0;
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
}
catch (...)
{
/// Do not try to write next time after exception.
out->position() = out->buffer().begin();
throw;
}
}

View File

@ -52,6 +52,8 @@ private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override;
void flush(ZSTD_EndDirective mode);
/// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it
/// should be almost noop, because frame epilogue contains only checksums,
/// and they are disabled for this buffer.

View File

@ -32,13 +32,8 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() = default;
void ZstdDeflatingWriteBuffer::nextImpl()
void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
{
if (!offset())
return;
ZSTD_EndDirective mode = ZSTD_e_flush;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
@ -54,7 +49,6 @@ void ZstdDeflatingWriteBuffer::nextImpl()
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
if (ZSTD_isError(compression_result))
throw Exception(
@ -78,24 +72,15 @@ void ZstdDeflatingWriteBuffer::nextImpl()
}
}
void ZstdDeflatingWriteBuffer::nextImpl()
{
if (offset())
flush(ZSTD_e_flush);
}
void ZstdDeflatingWriteBuffer::finalizeBefore()
{
next();
out->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: zstd version: {}", ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;
flush(ZSTD_e_end);
}
void ZstdDeflatingWriteBuffer::finalizeAfter()

View File

@ -37,6 +37,8 @@ private:
void finalizeBefore() override;
void finalizeAfter() override;
void flush(ZSTD_EndDirective mode);
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;

View File

@ -43,6 +43,7 @@ ipv6 Nullable(FixedString(16))
[(2,0,NULL,'','[]')]
1 1
0 1
5090915589685802007
16159458007063698496
16159458007063698496
BYTE_ARRAY String

View File

@ -147,6 +147,8 @@ insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaa
select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata);
insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaaaaa', toString(number)) as s from numbers(1000) settings output_format_parquet_row_group_size = 10000, output_format_parquet_compression_method='none';
select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata);
insert into function file(compressed_02735.parquet) select if(number%3==1, NULL, 42) as x from numbers(70) settings output_format_parquet_compression_method='zstd';
select sum(cityHash64(*)) from file(compressed_02735.parquet);
-- Single-threaded encoding and Arrow encoder.
drop table if exists other_encoders_02735;