diff --git a/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp b/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp index 81be8d8ce4d..5c4ee6203b3 100644 --- a/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp +++ b/src/IO/ZstdDeflatingAppendableWriteBuffer.cpp @@ -42,50 +42,13 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl() if (!offset()) return; - input.src = reinterpret_cast(working_buffer.begin()); - input.size = offset(); - input.pos = 0; - if (first_write && append_to_existing_file && isNeedToAddEmptyBlock()) { addEmptyBlock(); first_write = false; } - try - { - bool ended = false; - do - { - out->nextIfAtEnd(); - - output.dst = reinterpret_cast(out->buffer().begin()); - output.size = out->buffer().size(); - output.pos = out->offset(); - - size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush); - if (ZSTD_isError(compression_result)) - throw Exception( - ErrorCodes::ZSTD_ENCODER_FAILED, - "ZSTD stream decoding failed: error code: {}; ZSTD version: {}", - ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING); - - first_write = false; - out->position() = out->buffer().begin() + output.pos; - - bool everything_was_compressed = (input.pos == input.size); - bool everything_was_flushed = compression_result == 0; - - ended = everything_was_compressed && everything_was_flushed; - } while (!ended); - } - catch (...) - { - /// Do not try to write next time after exception. - out->position() = out->buffer().begin(); - throw; - } - + flush(ZSTD_e_flush); } ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer() @@ -103,58 +66,58 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl() } else { - try - { - finalizeBefore(); - out->finalize(); - finalizeAfter(); - } - catch (...) - { - /// Do not try to flush next time after exception. - out->position() = out->buffer().begin(); - throw; - } + finalizeBefore(); + out->finalize(); + finalizeAfter(); } } void ZstdDeflatingAppendableWriteBuffer::finalizeBefore() { - next(); - - out->nextIfAtEnd(); - - input.src = reinterpret_cast(working_buffer.begin()); - input.size = offset(); - input.pos = 0; - - output.dst = reinterpret_cast(out->buffer().begin()); - output.size = out->buffer().size(); - output.pos = out->offset(); - /// Actually we can use ZSTD_e_flush here and add empty termination /// block on each new buffer creation for non-empty file unconditionally (without isNeedToAddEmptyBlock). /// However ZSTD_decompressStream is able to read non-terminated frame (we use it in reader buffer), /// but console zstd utility cannot. - size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end); - while (remaining != 0) + flush(ZSTD_e_end); +} + +void ZstdDeflatingAppendableWriteBuffer::flush(ZSTD_EndDirective mode) +{ + input.src = reinterpret_cast(working_buffer.begin()); + input.size = offset(); + input.pos = 0; + + try { - if (ZSTD_isError(remaining)) - throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, - "ZSTD stream encoder end failed: error: '{}' ZSTD version: {}", - ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING); - - remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end); - - out->position() = out->buffer().begin() + output.pos; - - if (!out->hasPendingData()) + bool ended = false; + do { - out->next(); + out->nextIfAtEnd(); + output.dst = reinterpret_cast(out->buffer().begin()); output.size = out->buffer().size(); output.pos = out->offset(); - } + + size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode); + if (ZSTD_isError(compression_result)) + throw Exception( + ErrorCodes::ZSTD_ENCODER_FAILED, + "ZSTD stream decoding failed: error code: {}; ZSTD version: {}", + ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING); + + out->position() = out->buffer().begin() + output.pos; + + bool everything_was_compressed = (input.pos == input.size); + bool everything_was_flushed = compression_result == 0; + + ended = everything_was_compressed && everything_was_flushed; + } while (!ended); + } + catch (...) + { + /// Do not try to write next time after exception. + out->position() = out->buffer().begin(); + throw; } } diff --git a/src/IO/ZstdDeflatingAppendableWriteBuffer.h b/src/IO/ZstdDeflatingAppendableWriteBuffer.h index d9c4f32d6da..d082178142b 100644 --- a/src/IO/ZstdDeflatingAppendableWriteBuffer.h +++ b/src/IO/ZstdDeflatingAppendableWriteBuffer.h @@ -52,6 +52,8 @@ private: /// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full void nextImpl() override; + void flush(ZSTD_EndDirective mode); + /// Write terminating ZSTD_e_end: empty block + frame epilogue. BTW it /// should be almost noop, because frame epilogue contains only checksums, /// and they are disabled for this buffer. diff --git a/src/IO/ZstdDeflatingWriteBuffer.cpp b/src/IO/ZstdDeflatingWriteBuffer.cpp index 8fec5c5fadb..83d8487e3e7 100644 --- a/src/IO/ZstdDeflatingWriteBuffer.cpp +++ b/src/IO/ZstdDeflatingWriteBuffer.cpp @@ -32,13 +32,8 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer( ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() = default; -void ZstdDeflatingWriteBuffer::nextImpl() +void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode) { - if (!offset()) - return; - - ZSTD_EndDirective mode = ZSTD_e_flush; - input.src = reinterpret_cast(working_buffer.begin()); input.size = offset(); input.pos = 0; @@ -54,7 +49,6 @@ void ZstdDeflatingWriteBuffer::nextImpl() output.size = out->buffer().size(); output.pos = out->offset(); - size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode); if (ZSTD_isError(compression_result)) throw Exception( @@ -78,24 +72,15 @@ void ZstdDeflatingWriteBuffer::nextImpl() } } +void ZstdDeflatingWriteBuffer::nextImpl() +{ + if (offset()) + flush(ZSTD_e_flush); +} + void ZstdDeflatingWriteBuffer::finalizeBefore() { - next(); - - out->nextIfAtEnd(); - - input.src = reinterpret_cast(working_buffer.begin()); - input.size = offset(); - input.pos = 0; - - output.dst = reinterpret_cast(out->buffer().begin()); - output.size = out->buffer().size(); - output.pos = out->offset(); - - size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end); - if (ZSTD_isError(remaining)) - throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: zstd version: {}", ZSTD_VERSION_STRING); - out->position() = out->buffer().begin() + output.pos; + flush(ZSTD_e_end); } void ZstdDeflatingWriteBuffer::finalizeAfter() diff --git a/src/IO/ZstdDeflatingWriteBuffer.h b/src/IO/ZstdDeflatingWriteBuffer.h index ba83c18d354..a66d6085a74 100644 --- a/src/IO/ZstdDeflatingWriteBuffer.h +++ b/src/IO/ZstdDeflatingWriteBuffer.h @@ -37,6 +37,8 @@ private: void finalizeBefore() override; void finalizeAfter() override; + void flush(ZSTD_EndDirective mode); + ZSTD_CCtx * cctx; ZSTD_inBuffer input; ZSTD_outBuffer output; diff --git a/tests/queries/0_stateless/02735_parquet_encoder.reference b/tests/queries/0_stateless/02735_parquet_encoder.reference index c7d79392d85..155699329c1 100644 --- a/tests/queries/0_stateless/02735_parquet_encoder.reference +++ b/tests/queries/0_stateless/02735_parquet_encoder.reference @@ -43,6 +43,7 @@ ipv6 Nullable(FixedString(16)) [(2,0,NULL,'','[]')] 1 1 0 1 +5090915589685802007 16159458007063698496 16159458007063698496 BYTE_ARRAY String diff --git a/tests/queries/0_stateless/02735_parquet_encoder.sql b/tests/queries/0_stateless/02735_parquet_encoder.sql index 3701c685120..c8f6d8983a5 100644 --- a/tests/queries/0_stateless/02735_parquet_encoder.sql +++ b/tests/queries/0_stateless/02735_parquet_encoder.sql @@ -147,6 +147,8 @@ insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaa select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata); insert into function file(compressed_02735.parquet) select concat('aaaaaaaaaaaaaaaa', toString(number)) as s from numbers(1000) settings output_format_parquet_row_group_size = 10000, output_format_parquet_compression_method='none'; select total_compressed_size < 10000, total_uncompressed_size > 15000 from file(compressed_02735.parquet, ParquetMetadata); +insert into function file(compressed_02735.parquet) select if(number%3==1, NULL, 42) as x from numbers(70) settings output_format_parquet_compression_method='zstd'; +select sum(cityHash64(*)) from file(compressed_02735.parquet); -- Single-threaded encoding and Arrow encoder. drop table if exists other_encoders_02735;