diff --git a/src/IO/Lz4DeflatingWriteBuffer.cpp b/src/IO/Lz4DeflatingWriteBuffer.cpp index aab8dacef38..076b8c44f91 100644 --- a/src/IO/Lz4DeflatingWriteBuffer.cpp +++ b/src/IO/Lz4DeflatingWriteBuffer.cpp @@ -2,6 +2,59 @@ #include +namespace +{ + using namespace DB; + + class SinkToOut + { + public: + SinkToOut(WriteBuffer * out_, Memory<> & mem_, size_t guaranteed_capacity) + : sink(out_) + , tmp_out(mem_) + , cur_out(sink) + { + chassert(sink); + + if (sink->available() < guaranteed_capacity) + { + mem_.resize(guaranteed_capacity); + cur_out = &tmp_out; + } + } + + size_t getCapacity() + { + return cur_out->available(); + } + + BufferBase::Position getPosition() + { + return cur_out->position(); + } + + void advancePosition(size_t size) + { + chassert(size <= cur_out->available()); + cur_out->position() += size; + } + + ~SinkToOut() noexcept(false) + { + if (cur_out == sink) + return; + + sink->write(tmp_out.buffer().begin(), tmp_out.count()); + } + + private: + WriteBuffer * sink; + BufferWithOutsideMemory tmp_out; + WriteBuffer * cur_out; + }; +} + + namespace DB { namespace ErrorCodes @@ -13,9 +66,9 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer( std::unique_ptr out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment) : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) , in_data(nullptr) - , out_data(nullptr) , in_capacity(0) - , out_capacity(0) + , tmp_memory(buf_size) + { kPrefs = { {LZ4F_max256KB, @@ -36,8 +89,8 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer( if (LZ4F_isError(ret)) throw Exception( ErrorCodes::LZ4_ENCODER_FAILED, - "creation of LZ4 compression context failed. LZ4F version: {}", - LZ4F_VERSION); + "creation of LZ4 compression context failed. LZ4F version: {}, error: {}", + LZ4F_VERSION, LZ4F_getErrorName(ret)); } Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer() @@ -54,107 +107,76 @@ void Lz4DeflatingWriteBuffer::nextImpl() in_data = reinterpret_cast(working_buffer.begin()); in_capacity = offset(); - out_capacity = out->buffer().end() - out->position(); - out_data = reinterpret_cast(out->position()); - - try + if (first_time) { - if (first_time) + auto sink = SinkToOut(out.get(), tmp_memory, LZ4F_HEADER_SIZE_MAX); + chassert(sink.getCapacity() >= LZ4F_HEADER_SIZE_MAX); + + /// write frame header and check for errors + size_t header_size = LZ4F_compressBegin( + ctx, sink.getPosition(), sink.getCapacity(), &kPrefs); + + if (LZ4F_isError(header_size)) + throw Exception( + ErrorCodes::LZ4_ENCODER_FAILED, + "LZ4 failed to start stream encoding. LZ4F version: {}, error: {}", + LZ4F_VERSION, LZ4F_getErrorName(header_size)); + + sink.advancePosition(header_size); + first_time = false; + } + + do + { + /// Ensure that there is enough space for compressed block of minimal size + size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs); + + auto sink = SinkToOut(out.get(), tmp_memory, min_compressed_block_size); + chassert(sink.getCapacity() >= min_compressed_block_size); + + /// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually + size_t cur_buffer_size = in_capacity; + if (sink.getCapacity() >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small. { - if (out_capacity < LZ4F_HEADER_SIZE_MAX) - { - out->next(); - out_capacity = out->buffer().end() - out->position(); - out_data = reinterpret_cast(out->position()); - } - - /// write frame header and check for errors - size_t header_size = LZ4F_compressBegin(ctx, out_data, out_capacity, &kPrefs); - - if (LZ4F_isError(header_size)) - throw Exception( - ErrorCodes::LZ4_ENCODER_FAILED, - "LZ4 failed to start stream encoding. LZ4F version: {}", - LZ4F_VERSION); - - out_capacity -= header_size; - out->position() = out->buffer().end() - out_capacity; - out_data = reinterpret_cast(out->position()); - - first_time = false; + while (sink.getCapacity() < LZ4F_compressBound(cur_buffer_size, &kPrefs)) + cur_buffer_size /= 2; } - do - { - /// Ensure that there is enough space for compressed block of minimal size - size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs); - if (out_capacity < min_compressed_block_size) - { - out->next(); - out_capacity = out->buffer().end() - out->position(); - out_data = reinterpret_cast(out->position()); - } + size_t compressed_size = LZ4F_compressUpdate( + ctx, sink.getPosition(), sink.getCapacity(), in_data, cur_buffer_size, nullptr); - /// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually - size_t cur_buffer_size = in_capacity; - if (out_capacity >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small. - { - while (out_capacity < LZ4F_compressBound(cur_buffer_size, &kPrefs)) - cur_buffer_size /= 2; - } + if (LZ4F_isError(compressed_size)) + throw Exception( + ErrorCodes::LZ4_ENCODER_FAILED, + "LZ4 failed to encode stream. LZ4F version: {}, error {}, out_capacity {}", + LZ4F_VERSION, LZ4F_getErrorName(compressed_size), sink.getCapacity()); - size_t compressed_size = LZ4F_compressUpdate(ctx, out_data, out_capacity, in_data, cur_buffer_size, nullptr); + in_capacity -= cur_buffer_size; + in_data = reinterpret_cast(working_buffer.end() - in_capacity); - if (LZ4F_isError(compressed_size)) - throw Exception( - ErrorCodes::LZ4_ENCODER_FAILED, - "LZ4 failed to encode stream. LZ4F version: {}", - LZ4F_VERSION); - - in_capacity -= cur_buffer_size; - in_data = reinterpret_cast(working_buffer.end() - in_capacity); - - out_capacity -= compressed_size; - out->position() = out->buffer().end() - out_capacity; - out_data = reinterpret_cast(out->position()); - } - while (in_capacity > 0); + sink.advancePosition(compressed_size); } - catch (...) - { - out->position() = out->buffer().begin(); - throw; - } - out->next(); - out_capacity = out->buffer().end() - out->position(); + while (in_capacity > 0); } void Lz4DeflatingWriteBuffer::finalizeBefore() { next(); - out_capacity = out->buffer().end() - out->position(); - out_data = reinterpret_cast(out->position()); - - if (out_capacity < LZ4F_compressBound(0, &kPrefs)) - { - out->next(); - out_capacity = out->buffer().end() - out->position(); - out_data = reinterpret_cast(out->position()); - } + auto suffix_size = LZ4F_compressBound(0, &kPrefs); + auto sink = SinkToOut(out.get(), tmp_memory, suffix_size); + chassert(sink.getCapacity() >= suffix_size); /// compression end - size_t end_size = LZ4F_compressEnd(ctx, out_data, out_capacity, nullptr); + size_t end_size = LZ4F_compressEnd(ctx, sink.getPosition(), sink.getCapacity(), nullptr); if (LZ4F_isError(end_size)) throw Exception( ErrorCodes::LZ4_ENCODER_FAILED, - "LZ4 failed to end stream encoding. LZ4F version: {}", - LZ4F_VERSION); + "LZ4 failed to end stream encoding. LZ4F version: {}, error {}, out_capacity {}", + LZ4F_VERSION, LZ4F_getErrorName(end_size), sink.getCapacity()); - out_capacity -= end_size; - out->position() = out->buffer().end() - out_capacity; - out_data = reinterpret_cast(out->position()); + sink.advancePosition(end_size); } void Lz4DeflatingWriteBuffer::finalizeAfter() diff --git a/src/IO/Lz4DeflatingWriteBuffer.h b/src/IO/Lz4DeflatingWriteBuffer.h index 68873b5f8ee..65f4f0c7349 100644 --- a/src/IO/Lz4DeflatingWriteBuffer.h +++ b/src/IO/Lz4DeflatingWriteBuffer.h @@ -33,10 +33,9 @@ private: LZ4F_compressionContext_t ctx; void * in_data; - void * out_data; - size_t in_capacity; - size_t out_capacity; + + Memory<> tmp_memory; bool first_time = true; }; diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index d6bcb3fb8f4..60642379366 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -97,9 +97,8 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"): ] -# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed @pytest.mark.parametrize( - "compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate"] + "compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate", "lz4"] ) def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression): node = cluster.instances["node"] @@ -137,9 +136,8 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression assert count_s3_errors == 1 -# Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed @pytest.mark.parametrize( - "compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate"] + "compression", ["none", "gzip", "br", "xz", "zstd", "bz2", "deflate", "lz4"] ) def test_upload_s3_fail_upload_part_when_multi_part_upload( cluster, broken_s3, compression