diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 03daaf8907b..447ff6fb880 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -284,6 +284,7 @@ endif () if (USE_INTERNAL_BROTLI_LIBRARY) add_subdirectory(brotli-cmake) + target_compile_definitions(brotli PRIVATE BROTLI_BUILD_PORTABLE=1) endif () if (USE_INTERNAL_PROTOBUF_LIBRARY) diff --git a/dbms/programs/server/HTTPHandler.cpp b/dbms/programs/server/HTTPHandler.cpp index 763a30c1928..061b7af574c 100644 --- a/dbms/programs/server/HTTPHandler.cpp +++ b/dbms/programs/server/HTTPHandler.cpp @@ -296,7 +296,7 @@ void HTTPHandler::processQuery( /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). String http_response_compression_methods = request.get("Accept-Encoding", ""); bool client_supports_http_compression = false; - ZlibCompressionMethod http_response_compression_method {}; + CompressionMethod http_response_compression_method {}; if (!http_response_compression_methods.empty()) { @@ -305,12 +305,17 @@ void HTTPHandler::processQuery( if (std::string::npos != http_response_compression_methods.find("gzip")) { client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Gzip; + http_response_compression_method = CompressionMethod::Gzip; } else if (std::string::npos != http_response_compression_methods.find("deflate")) { client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Zlib; + http_response_compression_method = CompressionMethod::Zlib; + } + else if (http_response_compression_methods == "br") + { + client_supports_http_compression = true; + http_response_compression_method = CompressionMethod::Brotli; } } @@ -394,11 +399,11 @@ void HTTPHandler::processQuery( { if (http_request_compression_method_str == "gzip") { - in_post = std::make_unique(*in_post_raw, ZlibCompressionMethod::Gzip); + in_post = std::make_unique(*in_post_raw, CompressionMethod::Gzip); } else if (http_request_compression_method_str == "deflate") { - in_post = std::make_unique(*in_post_raw, ZlibCompressionMethod::Zlib); + in_post = std::make_unique(*in_post_raw, CompressionMethod::Zlib); } #if USE_BROTLI else if (http_request_compression_method_str == "br") diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 49e0937282e..bc53e39ec92 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -422,6 +422,8 @@ namespace ErrorCodes extern const int CANNOT_MPROTECT = 445; extern const int FUNCTION_NOT_ALLOWED = 446; extern const int HYPERSCAN_CANNOT_SCAN_TEXT = 447; + extern const int BROTLI_READ_FAILED = 448; + extern const int BROTLI_WRITE_FAILED = 449; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/IO/BrotliReadBuffer.cpp b/dbms/src/IO/BrotliReadBuffer.cpp index d6e7aaf5613..08ef8a7008b 100644 --- a/dbms/src/IO/BrotliReadBuffer.cpp +++ b/dbms/src/IO/BrotliReadBuffer.cpp @@ -29,7 +29,7 @@ public: BrotliReadBuffer::BrotliReadBuffer(ReadBuffer &in_, size_t buf_size, char *existing_memory, size_t alignment) : BufferWithOwnMemory(buf_size, existing_memory, alignment) , in(in_) - , brotli(new BrotliStateWrapper()) + , brotli(std::make_unique()) , in_available(0) , in_data(nullptr) , out_capacity(0) @@ -56,7 +56,7 @@ bool BrotliReadBuffer::nextImpl() if (brotli->result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && (!in_available || in.eof())) { - throw Exception(std::string("brotli decode error"), ErrorCodes::CANNOT_READ_ALL_DATA); + throw Exception(std::string("brotli decode error"), ErrorCodes::BROTLI_READ_FAILED); } out_capacity = internal_buffer.size(); @@ -76,13 +76,13 @@ bool BrotliReadBuffer::nextImpl() } else { - throw Exception(std::string("brotli decode error"), ErrorCodes::CANNOT_READ_ALL_DATA); + throw Exception(std::string("brotli decode error"), ErrorCodes::BROTLI_READ_FAILED); } } if (brotli->result == BROTLI_DECODER_RESULT_ERROR) { - throw Exception(std::string("brotli decode error"), ErrorCodes::CANNOT_READ_ALL_DATA); + throw Exception(std::string("brotli decode error"), ErrorCodes::BROTLI_READ_FAILED); } return true; diff --git a/dbms/src/IO/BrotliReadBuffer.h b/dbms/src/IO/BrotliReadBuffer.h index d6f2b7712b3..b164f1cc345 100644 --- a/dbms/src/IO/BrotliReadBuffer.h +++ b/dbms/src/IO/BrotliReadBuffer.h @@ -7,6 +7,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int BROTLI_READ_FAILED; +} + class BrotliReadBuffer : public BufferWithOwnMemory { public: diff --git a/dbms/src/IO/BrotliWriteBuffer.cpp b/dbms/src/IO/BrotliWriteBuffer.cpp new file mode 100644 index 00000000000..ed1c6d203bf --- /dev/null +++ b/dbms/src/IO/BrotliWriteBuffer.cpp @@ -0,0 +1,120 @@ +#include +#if USE_BROTLI + +#include +#include + +namespace DB +{ + +class BrotliWriteBuffer::BrotliStateWrapper +{ +public: + BrotliStateWrapper() + : state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr)) + { + } + + ~BrotliStateWrapper() + { + BrotliEncoderDestroyInstance(state); + } + +public: + BrotliEncoderState * state; +}; + +BrotliWriteBuffer::BrotliWriteBuffer(WriteBuffer & out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment) + : BufferWithOwnMemory(buf_size, existing_memory, alignment) + , brotli(std::make_unique()) + , in_available(0) + , in_data(nullptr) + , out_capacity(0) + , out_data(nullptr) + , out(out_) +{ + BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast(compression_level)); + // Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81) + BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24); +} + +BrotliWriteBuffer::~BrotliWriteBuffer() +{ + finish(); +} + +void BrotliWriteBuffer::nextImpl() +{ + if (!offset()) + { + return; + } + + in_data = reinterpret_cast(working_buffer.begin()); + in_available = offset(); + + do + { + out.nextIfAtEnd(); + out_data = reinterpret_cast(out.position()); + out_capacity = out.buffer().end() - out.position(); + + int result = BrotliEncoderCompressStream( + brotli->state, + in_available ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH, + &in_available, + &in_data, + &out_capacity, + &out_data, + nullptr); + + out.position() = out.buffer().end() - out_capacity; + + if (result == 0) + { + throw Exception(std::string("brotli compress failed"), ErrorCodes::BROTLI_WRITE_FAILED); + } + } + while (in_available > 0 || out_capacity == 0); +} + +void BrotliWriteBuffer::finish() +{ + if (finished) + return; + + next(); + + while (true) + { + out.nextIfAtEnd(); + out_data = reinterpret_cast(out.position()); + out_capacity = out.buffer().end() - out.position(); + + int result = BrotliEncoderCompressStream( + brotli->state, + BROTLI_OPERATION_FINISH, + &in_available, + &in_data, + &out_capacity, + &out_data, + nullptr); + + out.position() = out.buffer().end() - out_capacity; + + if (BrotliEncoderIsFinished(brotli->state)) + { + finished = true; + return; + } + + if (result == 0) + { + throw Exception(std::string("brotli compress failed"), ErrorCodes::BROTLI_WRITE_FAILED); + } + } +} + +} + +#endif diff --git a/dbms/src/IO/BrotliWriteBuffer.h b/dbms/src/IO/BrotliWriteBuffer.h new file mode 100644 index 00000000000..d1b2aaf4984 --- /dev/null +++ b/dbms/src/IO/BrotliWriteBuffer.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BROTLI_WRITE_FAILED; +} + +class BrotliWriteBuffer : public BufferWithOwnMemory +{ +public: + BrotliWriteBuffer( + WriteBuffer & out_, + int compression_level, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + char * existing_memory = nullptr, + size_t alignment = 0); + + ~BrotliWriteBuffer() override; + + void finish(); + +private: + void nextImpl() override; + + class BrotliStateWrapper; + std::unique_ptr brotli; + + size_t in_available; + const uint8_t * in_data; + + size_t out_capacity; + uint8_t * out_data; + + WriteBuffer & out; + + bool finished = false; +}; + +} diff --git a/dbms/src/IO/ZlibCompressionMethod.h b/dbms/src/IO/CompressionMethod.h similarity index 89% rename from dbms/src/IO/ZlibCompressionMethod.h rename to dbms/src/IO/CompressionMethod.h index fcdbc475b5c..96b9d41305e 100644 --- a/dbms/src/IO/ZlibCompressionMethod.h +++ b/dbms/src/IO/CompressionMethod.h @@ -3,7 +3,7 @@ namespace DB { -enum class ZlibCompressionMethod +enum class CompressionMethod { /// DEFLATE compression with gzip header and CRC32 checksum. /// This option corresponds to files produced by gzip(1) or HTTP Content-Encoding: gzip. @@ -11,6 +11,7 @@ enum class ZlibCompressionMethod /// DEFLATE compression with zlib header and Adler32 checksum. /// This option corresponds to HTTP Content-Encoding: deflate. Zlib, + Brotli, }; } diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp index 2343d08c265..e3faf942943 100644 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp +++ b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp @@ -76,34 +76,47 @@ void WriteBufferFromHTTPServerResponse::nextImpl() { if (compress) { - if (compression_method == ZlibCompressionMethod::Gzip) + if (compression_method == CompressionMethod::Gzip) { #if defined(POCO_CLICKHOUSE_PATCH) *response_header_ostr << "Content-Encoding: gzip\r\n"; #else response.set("Content-Encoding", "gzip"); + response_body_ostr = &(response.send()); #endif + out_raw.emplace(*response_body_ostr); + deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin()); + out = &*deflating_buf; } - else if (compression_method == ZlibCompressionMethod::Zlib) + else if (compression_method == CompressionMethod::Zlib) { #if defined(POCO_CLICKHOUSE_PATCH) *response_header_ostr << "Content-Encoding: deflate\r\n"; #else response.set("Content-Encoding", "deflate"); + response_body_ostr = &(response.send()); #endif + out_raw.emplace(*response_body_ostr); + deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin()); + out = &*deflating_buf; } + else if (compression_method == CompressionMethod::Brotli) + { +#if defined(POCO_CLICKHOUSE_PATCH) + *response_header_ostr << "Content-Encoding: br\r\n"; +#else + response.set("Content-Encoding", "br"); + response_body_ostr = &(response.send()); +#endif + out_raw.emplace(*response_body_ostr); + brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin()); + out = &*brotli_buf; + } + else throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse", ErrorCodes::LOGICAL_ERROR); /// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy. - -#if !defined(POCO_CLICKHOUSE_PATCH) - response_body_ostr = &(response.send()); -#endif - - out_raw.emplace(*response_body_ostr); - deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin()); - out = &*deflating_buf; } else { @@ -133,7 +146,7 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( Poco::Net::HTTPServerResponse & response_, unsigned keep_alive_timeout_, bool compress_, - ZlibCompressionMethod compression_method_, + CompressionMethod compression_method_, size_t size) : BufferWithOwnMemory(size) , request(request_) diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h b/dbms/src/IO/WriteBufferFromHTTPServerResponse.h index a6360340735..f82e5828694 100644 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h +++ b/dbms/src/IO/WriteBufferFromHTTPServerResponse.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -49,7 +50,7 @@ private: bool add_cors_header = false; unsigned keep_alive_timeout = 0; bool compress = false; - ZlibCompressionMethod compression_method; + CompressionMethod compression_method; int compression_level = Z_DEFAULT_COMPRESSION; std::ostream * response_body_ostr = nullptr; @@ -60,6 +61,7 @@ private: std::optional out_raw; std::optional deflating_buf; + std::optional brotli_buf; WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf. @@ -89,7 +91,7 @@ public: Poco::Net::HTTPServerResponse & response_, unsigned keep_alive_timeout_, bool compress_ = false, /// If true - set Content-Encoding header and compress the result. - ZlibCompressionMethod compression_method_ = ZlibCompressionMethod::Gzip, + CompressionMethod compression_method_ = CompressionMethod::Gzip, size_t size = DBMS_DEFAULT_BUFFER_SIZE); /// Writes progess in repeating HTTP headers. diff --git a/dbms/src/IO/ZlibDeflatingWriteBuffer.cpp b/dbms/src/IO/ZlibDeflatingWriteBuffer.cpp index 0c994974ad1..d14eccfe42e 100644 --- a/dbms/src/IO/ZlibDeflatingWriteBuffer.cpp +++ b/dbms/src/IO/ZlibDeflatingWriteBuffer.cpp @@ -6,7 +6,7 @@ namespace DB ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer( WriteBuffer & out_, - ZlibCompressionMethod compression_method, + CompressionMethod compression_method, int compression_level, size_t buf_size, char * existing_memory, @@ -23,7 +23,7 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer( zstr.avail_out = 0; int window_bits = 15; - if (compression_method == ZlibCompressionMethod::Gzip) + if (compression_method == CompressionMethod::Gzip) { window_bits += 16; } diff --git a/dbms/src/IO/ZlibDeflatingWriteBuffer.h b/dbms/src/IO/ZlibDeflatingWriteBuffer.h index 7a8b388af01..19f2770ef05 100644 --- a/dbms/src/IO/ZlibDeflatingWriteBuffer.h +++ b/dbms/src/IO/ZlibDeflatingWriteBuffer.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include @@ -21,7 +21,7 @@ class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory public: ZlibDeflatingWriteBuffer( WriteBuffer & out_, - ZlibCompressionMethod compression_method, + CompressionMethod compression_method, int compression_level, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, diff --git a/dbms/src/IO/ZlibInflatingReadBuffer.cpp b/dbms/src/IO/ZlibInflatingReadBuffer.cpp index 5b0796134ac..efa06678424 100644 --- a/dbms/src/IO/ZlibInflatingReadBuffer.cpp +++ b/dbms/src/IO/ZlibInflatingReadBuffer.cpp @@ -6,7 +6,7 @@ namespace DB ZlibInflatingReadBuffer::ZlibInflatingReadBuffer( ReadBuffer & in_, - ZlibCompressionMethod compression_method, + CompressionMethod compression_method, size_t buf_size, char * existing_memory, size_t alignment) @@ -23,7 +23,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer( zstr.avail_out = 0; int window_bits = 15; - if (compression_method == ZlibCompressionMethod::Gzip) + if (compression_method == CompressionMethod::Gzip) { window_bits += 16; } diff --git a/dbms/src/IO/ZlibInflatingReadBuffer.h b/dbms/src/IO/ZlibInflatingReadBuffer.h index b65e5768a3c..02ed443aa60 100644 --- a/dbms/src/IO/ZlibInflatingReadBuffer.h +++ b/dbms/src/IO/ZlibInflatingReadBuffer.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include @@ -22,7 +22,7 @@ class ZlibInflatingReadBuffer : public BufferWithOwnMemory public: ZlibInflatingReadBuffer( ReadBuffer & in_, - ZlibCompressionMethod compression_method, + CompressionMethod compression_method, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0); diff --git a/dbms/src/IO/tests/zlib_buffers.cpp b/dbms/src/IO/tests/zlib_buffers.cpp index 2c55509deb8..ff7aa8c5d26 100644 --- a/dbms/src/IO/tests/zlib_buffers.cpp +++ b/dbms/src/IO/tests/zlib_buffers.cpp @@ -23,7 +23,7 @@ try { DB::WriteBufferFromFile buf("test_zlib_buffers.gz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC); - DB::ZlibDeflatingWriteBuffer deflating_buf(buf, DB::ZlibCompressionMethod::Gzip, /* compression_level = */ 3); + DB::ZlibDeflatingWriteBuffer deflating_buf(buf, DB::CompressionMethod::Gzip, /* compression_level = */ 3); stopwatch.restart(); for (size_t i = 0; i < n; ++i) @@ -41,7 +41,7 @@ try { DB::ReadBufferFromFile buf("test_zlib_buffers.gz"); - DB::ZlibInflatingReadBuffer inflating_buf(buf, DB::ZlibCompressionMethod::Gzip); + DB::ZlibInflatingReadBuffer inflating_buf(buf, DB::CompressionMethod::Gzip); stopwatch.restart(); for (size_t i = 0; i < n; ++i) diff --git a/dbms/tests/queries/0_stateless/00302_http_compression.reference b/dbms/tests/queries/0_stateless/00302_http_compression.reference index a572b69a989..b2ab0bdb76c 100644 --- a/dbms/tests/queries/0_stateless/00302_http_compression.reference +++ b/dbms/tests/queries/0_stateless/00302_http_compression.reference @@ -48,9 +48,20 @@ 7 8 9 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 < Content-Encoding: gzip < Content-Encoding: deflate < Content-Encoding: gzip +< Content-Encoding: br 1 1 1 diff --git a/dbms/tests/queries/0_stateless/00302_http_compression.sh b/dbms/tests/queries/0_stateless/00302_http_compression.sh index 1b47312ce5f..399fe27ea9b 100755 --- a/dbms/tests/queries/0_stateless/00302_http_compression.sh +++ b/dbms/tests/queries/0_stateless/00302_http_compression.sh @@ -12,12 +12,14 @@ ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?enable_http_compression=0" -H 'Accept- ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d; ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: gzip, deflate' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d; ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10'; +${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' | brotli -d; ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: deflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: gzip, deflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; +${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding'; echo "SELECT 1" | ${CLICKHOUSE_CURL} -sS --data-binary @- ${CLICKHOUSE_URL}; echo "SELECT 1" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' ${CLICKHOUSE_URL};