This commit is contained in:
Yakov Olkhovskiy 2023-10-30 05:37:47 +00:00
parent 9c34ac5a6f
commit e1d0994c0a
15 changed files with 55 additions and 31 deletions

View File

@ -74,7 +74,7 @@ void BrotliWriteBuffer::finalizeBefore()
next(); next();
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (total_out == 0) if (!compress_empty && total_out == 0)
return; return;
while (true) while (true)

View File

@ -22,13 +22,15 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>()) , brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0) , in_available(0)
, in_data(nullptr) , in_data(nullptr)
, out_capacity(0) , out_capacity(0)
, out_data(nullptr) , out_data(nullptr)
, compress_empty(compress_empty_)
{ {
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level)); BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81) // Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
@ -62,6 +64,7 @@ private:
protected: protected:
size_t total_out = 0; size_t total_out = 0;
bool compress_empty = true;
}; };
} }

View File

@ -79,7 +79,7 @@ void Bzip2WriteBuffer::finalizeBefore()
next(); next();
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (bz->stream.total_out_hi32 == 0 && bz->stream.total_out_lo32 == 0) if (!compress_empty && bz->stream.total_out_hi32 == 0 && bz->stream.total_out_lo32 == 0)
return; return;
out->nextIfAtEnd(); out->nextIfAtEnd();

View File

@ -21,9 +21,10 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) bool compress_empty_ = true)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level)) : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, compress_empty(compress_empty_)
{ {
} }
@ -44,6 +45,7 @@ private:
}; };
std::unique_ptr<Bzip2StateWrapper> bz; std::unique_ptr<Bzip2StateWrapper> bz;
bool compress_empty = true;
}; };
} }

View File

@ -172,27 +172,27 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
template<typename WriteBufferT> template<typename WriteBufferT>
std::unique_ptr<WriteBuffer> createWriteCompressedWrapper( std::unique_ptr<WriteBuffer> createWriteCompressedWrapper(
WriteBufferT && nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment) WriteBufferT && nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment, bool compress_empty)
{ {
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib) if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), method, level, buf_size, existing_memory, alignment); return std::make_unique<ZlibDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), method, level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BROTLI #if USE_BROTLI
if (method == DB::CompressionMethod::Brotli) if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment); return std::make_unique<BrotliWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif #endif
if (method == CompressionMethod::Xz) if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment); return std::make_unique<LZMADeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Zstd) if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment); return std::make_unique<ZstdDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Lz4) if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment); return std::make_unique<Lz4DeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BZIP2 #if USE_BZIP2
if (method == CompressionMethod::Bzip2) if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2WriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment); return std::make_unique<Bzip2WriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif #endif
#if USE_SNAPPY #if USE_SNAPPY
if (method == CompressionMethod::Snappy) if (method == CompressionMethod::Snappy)
@ -209,11 +209,12 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
int level, int level,
size_t buf_size, size_t buf_size,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment,
bool compress_empty)
{ {
if (method == CompressionMethod::None) if (method == CompressionMethod::None)
return nested; return nested;
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment); return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
} }
@ -223,10 +224,11 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
int level, int level,
size_t buf_size, size_t buf_size,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment,
bool compress_empty)
{ {
assert(method != CompressionMethod::None); assert(method != CompressionMethod::None);
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment); return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
} }
} }

View File

@ -79,7 +79,8 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
int level, int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0,
bool compress_empty = true);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod( std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested, WriteBuffer * nested,
@ -87,6 +88,7 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
int level, int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0,
bool compress_empty = true);
} }

View File

@ -93,7 +93,7 @@ void LZMADeflatingWriteBuffer::finalizeBefore()
next(); next();
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (lstr.total_out) if (!compress_empty && lstr.total_out == 0)
return; return;
do do

View File

@ -20,8 +20,9 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{ {
initialize(compression_level); initialize(compression_level);
} }
@ -37,6 +38,8 @@ private:
void finalizeAfter() override; void finalizeAfter() override;
lzma_stream lstr; lzma_stream lstr;
bool compress_empty = true;
}; };
} }

View File

@ -128,7 +128,7 @@ void Lz4DeflatingWriteBuffer::finalizeBefore()
next(); next();
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (first_time) if (!compress_empty && first_time)
return; return;
out_capacity = out->buffer().end() - out->position(); out_capacity = out->buffer().end() - out->position();

View File

@ -20,12 +20,14 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) : WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, in_data(nullptr) , in_data(nullptr)
, out_data(nullptr) , out_data(nullptr)
, in_capacity(0) , in_capacity(0)
, out_capacity(0) , out_capacity(0)
, compress_empty(compress_empty_)
{ {
initialize(compression_level); initialize(compression_level);
} }
@ -50,5 +52,6 @@ private:
size_t out_capacity; size_t out_capacity;
bool first_time = true; bool first_time = true;
bool compress_empty = true;
}; };
} }

View File

@ -53,7 +53,7 @@ void ZlibDeflatingWriteBuffer::finalizeBefore()
next(); next();
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (zstr.total_out == 0) if (!compress_empty && zstr.total_out == 0)
return; return;
/// https://github.com/zlib-ng/zlib-ng/issues/494 /// https://github.com/zlib-ng/zlib-ng/issues/494

View File

@ -28,8 +28,9 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{ {
zstr.zalloc = nullptr; zstr.zalloc = nullptr;
zstr.zfree = nullptr; zstr.zfree = nullptr;
@ -63,6 +64,7 @@ private:
virtual void finalizeAfter() override; virtual void finalizeAfter() override;
z_stream zstr; z_stream zstr;
bool compress_empty = true;
}; };
} }

View File

@ -86,7 +86,7 @@ void ZstdDeflatingWriteBuffer::nextImpl()
void ZstdDeflatingWriteBuffer::finalizeBefore() void ZstdDeflatingWriteBuffer::finalizeBefore()
{ {
/// Don't write out if no data was ever compressed /// Don't write out if no data was ever compressed
if (total_out == 0) if (!compress_empty && total_out == 0)
return; return;
flush(ZSTD_e_end); flush(ZSTD_e_end);
} }

View File

@ -20,8 +20,9 @@ public:
int compression_level, int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0) size_t alignment = 0,
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment) bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{ {
initialize(compression_level); initialize(compression_level);
} }
@ -51,6 +52,7 @@ private:
ZSTD_outBuffer output; ZSTD_outBuffer output;
size_t total_out = 0; size_t total_out = 0;
bool compress_empty = true;
}; };
} }

View File

@ -630,7 +630,12 @@ void HTTPHandler::processQuery(
if (client_supports_http_compression && enable_http_compression) if (client_supports_http_compression && enable_http_compression)
{ {
used_output.out_holder->setCompressionMethodHeader(http_response_compression_method); used_output.out_holder->setCompressionMethodHeader(http_response_compression_method);
used_output.wrap_compressed_holder = wrapWriteBufferWithCompressionMethod(used_output.out_holder.get(), http_response_compression_method, static_cast<int>(http_zlib_compression_level)); used_output.wrap_compressed_holder =
wrapWriteBufferWithCompressionMethod(
used_output.out_holder.get(),
http_response_compression_method,
static_cast<int>(http_zlib_compression_level),
DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0, false);
used_output.out = used_output.wrap_compressed_holder; used_output.out = used_output.wrap_compressed_holder;
} }
else else