mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
[#METR-23582]
* introduce ZlibDeflatingWriteBuffer and ZlibInflatingReadBuffer * use them instead of Poco streams * seamlessly uncompress multiple concatenated gzip streams
This commit is contained in:
parent
690ab98b91
commit
600bb5f29a
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,6 +1,7 @@
|
||||
# emacs files
|
||||
*~
|
||||
*\#
|
||||
.tramp_history
|
||||
|
||||
# auto generated files
|
||||
*.logrt
|
||||
|
@ -3,12 +3,13 @@
|
||||
#include <experimental/optional>
|
||||
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <Poco/DeflatingStream.h>
|
||||
|
||||
#include <DB/Common/Exception.h>
|
||||
|
||||
#include <DB/IO/WriteBuffer.h>
|
||||
#include <DB/IO/BufferWithOwnMemory.h>
|
||||
#include <DB/IO/WriteBufferFromOStream.h>
|
||||
#include <DB/IO/ZlibDeflatingWriteBuffer.h>
|
||||
#include <DB/Common/NetException.h>
|
||||
|
||||
|
||||
@ -17,60 +18,62 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_WRITE_TO_OSTREAM;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/** Отличается от WriteBufferFromOStream тем, что инициализируется не std::ostream, а Poco::Net::HTTPServerResponse.
|
||||
* При первом сбросе данных, получает из него std::ostream (с помощью метода send).
|
||||
* Это нужно в HTTP серверах, чтобы после передачи в какой-нибудь метод WriteBuffer-а,
|
||||
* но до вывода первых данных клиенту, можно было изменить какие-нибудь HTTP заголовки (например, код ответа).
|
||||
* (После вызова Poco::Net::HTTPServerResponse::send() изменить заголовки уже нельзя.)
|
||||
* То есть, суть в том, чтобы вызывать метод Poco::Net::HTTPServerResponse::send() не сразу.
|
||||
*
|
||||
* Дополнительно, позволяет сжимать тело HTTP-ответа, выставив соответствующий заголовок Content-Encoding.
|
||||
*/
|
||||
/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream
|
||||
/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP
|
||||
/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client
|
||||
/// (headers can't be changed after response.send() is called).
|
||||
///
|
||||
/// In short, it allows delaying the call to response.send().
|
||||
///
|
||||
/// Additionally, supports HTTP response compression (in this case corresponding Content-Encoding
|
||||
/// header will be set).
|
||||
class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer>
|
||||
{
|
||||
private:
|
||||
Poco::Net::HTTPServerResponse & response;
|
||||
|
||||
bool add_cors_header;
|
||||
bool compress;
|
||||
Poco::DeflatingStreamBuf::StreamType compression_method;
|
||||
ZlibCompressionMethod compression_method;
|
||||
int compression_level = Z_DEFAULT_COMPRESSION;
|
||||
|
||||
std::ostream * response_ostr = nullptr; /// Сюда записывается тело HTTP ответа, возможно, сжатое.
|
||||
std::experimental::optional<Poco::DeflatingOutputStream> deflating_stream;
|
||||
std::ostream * ostr = nullptr; /// Куда записывать несжатое тело HTTP ответа. Указывает туда же, куда response_ostr или на deflating_stream.
|
||||
std::unique_ptr<WriteBufferFromOStream> out_raw;
|
||||
std::experimental::optional<ZlibDeflatingWriteBuffer> deflating_buf;
|
||||
|
||||
WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf.
|
||||
|
||||
void sendHeaders()
|
||||
{
|
||||
if (!ostr)
|
||||
if (!out)
|
||||
{
|
||||
if (add_cors_header)
|
||||
{
|
||||
response.set("Access-Control-Allow-Origin","*");
|
||||
}
|
||||
|
||||
if (compress && offset()) /// Пустой ответ сжимать не нужно.
|
||||
if (compress && offset()) /// Empty response need not be compressed.
|
||||
{
|
||||
if (compression_method == Poco::DeflatingStreamBuf::STREAM_GZIP)
|
||||
if (compression_method == ZlibCompressionMethod::Gzip)
|
||||
response.set("Content-Encoding", "gzip");
|
||||
else if (compression_method == Poco::DeflatingStreamBuf::STREAM_ZLIB)
|
||||
else if (compression_method == ZlibCompressionMethod::Zlib)
|
||||
response.set("Content-Encoding", "deflate");
|
||||
else
|
||||
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
response_ostr = &response.send();
|
||||
deflating_stream.emplace(*response_ostr, compression_method, compression_level);
|
||||
ostr = &deflating_stream.value();
|
||||
out_raw = std::make_unique<WriteBufferFromOStream>(response.send());
|
||||
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
|
||||
deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin());
|
||||
out = &deflating_buf.value();
|
||||
}
|
||||
else
|
||||
{
|
||||
response_ostr = &response.send();
|
||||
ostr = response_ostr;
|
||||
out_raw = std::make_unique<WriteBufferFromOStream>(response.send(), working_buffer.size(), working_buffer.begin());
|
||||
out = out_raw.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -82,53 +85,43 @@ private:
|
||||
|
||||
sendHeaders();
|
||||
|
||||
ostr->write(working_buffer.begin(), offset());
|
||||
ostr->flush();
|
||||
|
||||
if (!ostr->good())
|
||||
throw NetException("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
|
||||
out->position() = position();
|
||||
out->next();
|
||||
}
|
||||
|
||||
public:
|
||||
WriteBufferFromHTTPServerResponse(
|
||||
Poco::Net::HTTPServerResponse & response_,
|
||||
bool compress_ = false, /// Если true - выставить заголовок Content-Encoding и сжимать результат.
|
||||
Poco::DeflatingStreamBuf::StreamType compression_method_ = Poco::DeflatingStreamBuf::STREAM_GZIP, /// Как сжимать результат (gzip, deflate).
|
||||
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
|
||||
ZlibCompressionMethod compression_method_ = ZlibCompressionMethod::Gzip,
|
||||
size_t size = DBMS_DEFAULT_BUFFER_SIZE)
|
||||
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
|
||||
compress(compress_), compression_method(compression_method_) {}
|
||||
|
||||
/** Если данные ещё не были отправлены - отправить хотя бы HTTP заголовки.
|
||||
* Используйте эту функцию после того, как данные, возможно, были отправлены,
|
||||
* и не было ошибок (вы не планируете поменять код ответа).
|
||||
*/
|
||||
/// Send at least HTTP headers if no data has been sent yet.
|
||||
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
|
||||
/// to change response HTTP code.
|
||||
void finalize()
|
||||
{
|
||||
sendHeaders();
|
||||
}
|
||||
|
||||
/** Включить или отключить сжатие.
|
||||
* Работает только перед тем, как были отправлены HTTP заголовки.
|
||||
* Иначе - не имеет эффекта.
|
||||
*/
|
||||
/// Turn compression on or off.
|
||||
/// The setting has any effect only if HTTP headers haven't been sent yet.
|
||||
void setCompression(bool enable_compression)
|
||||
{
|
||||
compress = enable_compression;
|
||||
}
|
||||
|
||||
/** Установить уровень сжатия, если данные будут сжиматься.
|
||||
* Работает только перед тем, как были отправлены HTTP заголовки.
|
||||
* Иначе - не имеет эффекта.
|
||||
*/
|
||||
/// Set compression level if the compression is turned on.
|
||||
/// The setting has any effect only if HTTP headers haven't been sent yet.
|
||||
void setCompressionLevel(int level)
|
||||
{
|
||||
compression_level = level;
|
||||
}
|
||||
|
||||
/** Включить или отключить CORS.
|
||||
* Работает только перед тем, как были отправлены HTTP заголовки.
|
||||
* Иначе - не имеет эффекта.
|
||||
*/
|
||||
/// Turn CORS on or off.
|
||||
/// The setting has any effect only if HTTP headers haven't been sent yet.
|
||||
void addHeaderCORS(bool enable_cors)
|
||||
{
|
||||
add_cors_header = enable_cors;
|
||||
@ -142,9 +135,6 @@ public:
|
||||
try
|
||||
{
|
||||
next();
|
||||
|
||||
if (deflating_stream)
|
||||
deflating_stream->close();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -34,8 +34,12 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
WriteBufferFromOStream(std::ostream & ostr_, size_t size = DBMS_DEFAULT_BUFFER_SIZE)
|
||||
: BufferWithOwnMemory<WriteBuffer>(size), ostr(ostr_) {}
|
||||
WriteBufferFromOStream(
|
||||
std::ostream & ostr_,
|
||||
size_t size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0)
|
||||
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment), ostr(ostr_) {}
|
||||
|
||||
~WriteBufferFromOStream() override
|
||||
{
|
||||
|
16
dbms/include/DB/IO/ZlibCompressionMethod.h
Normal file
16
dbms/include/DB/IO/ZlibCompressionMethod.h
Normal file
@ -0,0 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class ZlibCompressionMethod
|
||||
{
|
||||
/// DEFLATE compression with gzip header and CRC32 checksum.
|
||||
/// This option corresponds to files produced by gzip(1) or HTTP Content-Encoding: gzip.
|
||||
Gzip,
|
||||
/// DEFLATE compression with zlib header and Adler32 checksum.
|
||||
/// This option corresponds to HTTP Content-Encoding: deflate.
|
||||
Zlib,
|
||||
};
|
||||
|
||||
}
|
45
dbms/include/DB/IO/ZlibDeflatingWriteBuffer.h
Normal file
45
dbms/include/DB/IO/ZlibDeflatingWriteBuffer.h
Normal file
@ -0,0 +1,45 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/IO/WriteBuffer.h>
|
||||
#include <DB/IO/BufferWithOwnMemory.h>
|
||||
#include <DB/IO/ZlibCompressionMethod.h>
|
||||
|
||||
#include <zlib.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ZLIB_DEFLATE_FAILED;
|
||||
}
|
||||
|
||||
/// Performs compression using zlib library and writes compressed data to out_ WriteBuffer.
|
||||
class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
|
||||
{
|
||||
public:
|
||||
ZlibDeflatingWriteBuffer(
|
||||
WriteBuffer & out_,
|
||||
ZlibCompressionMethod compression_method,
|
||||
int compression_level,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
||||
/// Flush all pending data and write zlib footer to the underlying buffer.
|
||||
/// After the first call to this function, subsequent calls will have no effect and
|
||||
/// an attempt to write to this buffer will result in exception.
|
||||
void finish();
|
||||
|
||||
~ZlibDeflatingWriteBuffer() override;
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
|
||||
WriteBuffer & out;
|
||||
z_stream zstr;
|
||||
bool finished = false;
|
||||
};
|
||||
|
||||
}
|
40
dbms/include/DB/IO/ZlibInflatingReadBuffer.h
Normal file
40
dbms/include/DB/IO/ZlibInflatingReadBuffer.h
Normal file
@ -0,0 +1,40 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/IO/ReadBuffer.h>
|
||||
#include <DB/IO/BufferWithOwnMemory.h>
|
||||
#include <DB/IO/ZlibCompressionMethod.h>
|
||||
|
||||
#include <zlib.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ZLIB_INFLATE_FAILED;
|
||||
}
|
||||
|
||||
/// Reads compressed data from ReadBuffer in_ and performs decompression using zlib library.
|
||||
/// This buffer is able to seamlessly decompress multiple concatenated zlib streams.
|
||||
class ZlibInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
|
||||
{
|
||||
public:
|
||||
ZlibInflatingReadBuffer(
|
||||
ReadBuffer & in_,
|
||||
ZlibCompressionMethod compression_method,
|
||||
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
char * existing_memory = nullptr,
|
||||
size_t alignment = 0);
|
||||
|
||||
~ZlibInflatingReadBuffer() override;
|
||||
|
||||
private:
|
||||
bool nextImpl() override;
|
||||
|
||||
ReadBuffer & in;
|
||||
z_stream zstr;
|
||||
bool eof;
|
||||
};
|
||||
|
||||
}
|
@ -352,6 +352,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_LOAD_CONFIG = 347;
|
||||
extern const int RESHARDING_NULLABLE_SHARDING_KEY = 348;
|
||||
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN = 349;
|
||||
extern const int ZLIB_INFLATE_FAILED = 350;
|
||||
extern const int ZLIB_DEFLATE_FAILED = 351;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
104
dbms/src/IO/ZlibDeflatingWriteBuffer.cpp
Normal file
104
dbms/src/IO/ZlibDeflatingWriteBuffer.cpp
Normal file
@ -0,0 +1,104 @@
|
||||
#include <DB/IO/ZlibDeflatingWriteBuffer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
|
||||
WriteBuffer & out_,
|
||||
ZlibCompressionMethod compression_method,
|
||||
int compression_level,
|
||||
size_t buf_size,
|
||||
char * existing_memory,
|
||||
size_t alignment)
|
||||
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
|
||||
, out(out_)
|
||||
{
|
||||
zstr.zalloc = Z_NULL;
|
||||
zstr.zfree = Z_NULL;
|
||||
zstr.opaque = Z_NULL;
|
||||
zstr.next_in = 0;
|
||||
zstr.avail_in = 0;
|
||||
zstr.next_out = 0;
|
||||
zstr.avail_out = 0;
|
||||
|
||||
int window_bits = 15;
|
||||
if (compression_method == ZlibCompressionMethod::Gzip)
|
||||
{
|
||||
window_bits += 16;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wold-style-cast"
|
||||
int rc = deflateInit2(&zstr, compression_level, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("deflateInit2 failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
|
||||
}
|
||||
|
||||
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
|
||||
{
|
||||
try
|
||||
{
|
||||
finish();
|
||||
|
||||
int rc = deflateEnd(&zstr);
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("deflateEnd failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void ZlibDeflatingWriteBuffer::nextImpl()
|
||||
{
|
||||
if (!offset())
|
||||
return;
|
||||
|
||||
zstr.next_in = reinterpret_cast<unsigned char *>(working_buffer.begin());
|
||||
zstr.avail_in = offset();
|
||||
|
||||
do
|
||||
{
|
||||
out.nextIfAtEnd();
|
||||
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
|
||||
zstr.avail_out = out.buffer().end() - out.position();
|
||||
|
||||
int rc = deflate(&zstr, Z_NO_FLUSH);
|
||||
out.position() = out.buffer().end() - zstr.avail_out;
|
||||
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("deflate failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
|
||||
}
|
||||
while (zstr.avail_in > 0 || zstr.avail_out == 0);
|
||||
}
|
||||
|
||||
void ZlibDeflatingWriteBuffer::finish()
|
||||
{
|
||||
if (finished)
|
||||
return;
|
||||
|
||||
next();
|
||||
|
||||
while (true)
|
||||
{
|
||||
out.nextIfAtEnd();
|
||||
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
|
||||
zstr.avail_out = out.buffer().end() - out.position();
|
||||
|
||||
int rc = deflate(&zstr, Z_FINISH);
|
||||
out.position() = out.buffer().end() - zstr.avail_out;
|
||||
|
||||
if (rc == Z_STREAM_END)
|
||||
return;
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("deflate finish failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
|
||||
}
|
||||
|
||||
finished = true;
|
||||
}
|
||||
|
||||
}
|
85
dbms/src/IO/ZlibInflatingReadBuffer.cpp
Normal file
85
dbms/src/IO/ZlibInflatingReadBuffer.cpp
Normal file
@ -0,0 +1,85 @@
|
||||
#include <DB/IO/ZlibInflatingReadBuffer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
|
||||
ReadBuffer & in_,
|
||||
ZlibCompressionMethod compression_method,
|
||||
size_t buf_size,
|
||||
char * existing_memory,
|
||||
size_t alignment)
|
||||
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
|
||||
, in(in_)
|
||||
, eof(false)
|
||||
{
|
||||
zstr.zalloc = Z_NULL;
|
||||
zstr.zfree = Z_NULL;
|
||||
zstr.opaque = Z_NULL;
|
||||
zstr.next_in = 0;
|
||||
zstr.avail_in = 0;
|
||||
zstr.next_out = 0;
|
||||
zstr.avail_out = 0;
|
||||
|
||||
int window_bits = 15;
|
||||
if (compression_method == ZlibCompressionMethod::Gzip)
|
||||
{
|
||||
window_bits += 16;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wold-style-cast"
|
||||
int rc = inflateInit2(&zstr, window_bits);
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflateInit2 failed: ") + zError(rc), ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
}
|
||||
|
||||
ZlibInflatingReadBuffer::~ZlibInflatingReadBuffer()
|
||||
{
|
||||
inflateEnd(&zstr);
|
||||
}
|
||||
|
||||
bool ZlibInflatingReadBuffer::nextImpl()
|
||||
{
|
||||
if (eof)
|
||||
return false;
|
||||
|
||||
if (!zstr.avail_in)
|
||||
{
|
||||
in.nextIfAtEnd();
|
||||
zstr.next_in = reinterpret_cast<unsigned char *>(in.position());
|
||||
zstr.avail_in = in.buffer().end() - in.position();
|
||||
}
|
||||
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
|
||||
zstr.avail_out = internal_buffer.size();
|
||||
|
||||
int rc = inflate(&zstr, Z_NO_FLUSH);
|
||||
|
||||
in.position() = in.buffer().end() - zstr.avail_in;
|
||||
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
|
||||
|
||||
if (rc == Z_STREAM_END)
|
||||
{
|
||||
if (in.eof())
|
||||
{
|
||||
eof = true;
|
||||
return working_buffer.size() != 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
int rc = inflateReset(&zstr);
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflateReset failed: ") + zError(rc), ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (rc != Z_OK)
|
||||
throw Exception(std::string("inflate failed: ") + zError(rc), ErrorCodes::ZLIB_INFLATE_FAILED);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -70,3 +70,6 @@ if (NOT APPLE)
|
||||
add_executable(read_buffer_aio read_buffer_aio.cpp)
|
||||
target_link_libraries (read_buffer_aio dbms ${Boost_FILESYSTEM_LIBRARY})
|
||||
endif ()
|
||||
|
||||
add_executable (zlib_buffers zlib_buffers.cpp ${SRCS})
|
||||
target_link_libraries (zlib_buffers dbms)
|
||||
|
70
dbms/src/IO/tests/zlib_buffers.cpp
Normal file
70
dbms/src/IO/tests/zlib_buffers.cpp
Normal file
@ -0,0 +1,70 @@
|
||||
#include <string>
|
||||
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
|
||||
#include <DB/Core/Types.h>
|
||||
#include <DB/Common/Stopwatch.h>
|
||||
#include <DB/IO/WriteBufferFromFile.h>
|
||||
#include <DB/IO/ReadBufferFromFile.h>
|
||||
#include <DB/IO/ZlibDeflatingWriteBuffer.h>
|
||||
#include <DB/IO/ZlibInflatingReadBuffer.h>
|
||||
#include <DB/IO/WriteHelpers.h>
|
||||
#include <DB/IO/ReadHelpers.h>
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::cout << std::fixed << std::setprecision(2);
|
||||
|
||||
size_t n = 100000000;
|
||||
Stopwatch stopwatch;
|
||||
|
||||
{
|
||||
DB::WriteBufferFromFile buf("test_zlib_buffers.gz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
|
||||
DB::ZlibDeflatingWriteBuffer deflating_buf(buf, DB::ZlibCompressionMethod::Gzip, /* compression_level = */ 3);
|
||||
|
||||
stopwatch.restart();
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
DB::writeIntText(i, deflating_buf);
|
||||
DB::writeChar('\t', deflating_buf);
|
||||
}
|
||||
deflating_buf.finish();
|
||||
|
||||
stopwatch.stop();
|
||||
std::cout << "Writing done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
|
||||
<< ", " << (deflating_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s"
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
{
|
||||
DB::ReadBufferFromFile buf("test_zlib_buffers.gz");
|
||||
DB::ZlibInflatingReadBuffer inflating_buf(buf, DB::ZlibCompressionMethod::Gzip);
|
||||
|
||||
stopwatch.restart();
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
size_t x;
|
||||
DB::readIntText(x, inflating_buf);
|
||||
inflating_buf.ignore();
|
||||
|
||||
if (x != i)
|
||||
throw DB::Exception("Failed!, read: " + std::to_string(x) + ", expected: " + std::to_string(i));
|
||||
}
|
||||
stopwatch.stop();
|
||||
std::cout << "Reading done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
|
||||
<< ", " << (inflating_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s"
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.displayText() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,13 +1,12 @@
|
||||
#include <iomanip>
|
||||
|
||||
#include <Poco/InflatingStream.h>
|
||||
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
|
||||
#include <DB/Common/Stopwatch.h>
|
||||
#include <DB/Common/StringUtils.h>
|
||||
|
||||
#include <DB/IO/ReadBufferFromIStream.h>
|
||||
#include <DB/IO/ZlibInflatingReadBuffer.h>
|
||||
#include <DB/IO/ReadBufferFromString.h>
|
||||
#include <DB/IO/ConcatReadBuffer.h>
|
||||
#include <DB/IO/CompressedReadBuffer.h>
|
||||
@ -48,49 +47,46 @@ void HTTPHandler::processQuery(
|
||||
|
||||
BlockInputStreamPtr query_plan;
|
||||
|
||||
/** Часть запроса может быть передана в параметре query, а часть - POST-ом
|
||||
* (точнее - в теле запроса, а метод не обязательно должен быть POST).
|
||||
* В таком случае, считается, что запрос - параметр query, затем перевод строки, а затем - данные POST-а.
|
||||
*/
|
||||
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
|
||||
/// (http method need not necessarily be POST). In this case the entire query consists of the
|
||||
/// contents of the 'query' parameter, a line break and the request body.
|
||||
std::string query_param = params.get("query", "");
|
||||
if (!query_param.empty())
|
||||
query_param += '\n';
|
||||
|
||||
/** Клиент может указать поддерживаемый метод сжатия (gzip или deflate) в HTTP-заголовке.
|
||||
*/
|
||||
|
||||
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
|
||||
String http_response_compression_methods = request.get("Accept-Encoding", "");
|
||||
bool client_supports_http_compression = false;
|
||||
Poco::DeflatingStreamBuf::StreamType http_response_compression_method {};
|
||||
ZlibCompressionMethod http_response_compression_method {};
|
||||
|
||||
if (!http_response_compression_methods.empty())
|
||||
{
|
||||
/// Мы поддерживаем gzip или deflate. Если клиент поддерживает оба, то предпочитается gzip.
|
||||
/// NOTE Парсинг списка методов слегка некорректный.
|
||||
|
||||
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
|
||||
/// NOTE parsing of the list of methods is slightly incorrect.
|
||||
if (std::string::npos != http_response_compression_methods.find("gzip"))
|
||||
{
|
||||
client_supports_http_compression = true;
|
||||
http_response_compression_method = Poco::DeflatingStreamBuf::STREAM_GZIP;
|
||||
http_response_compression_method = ZlibCompressionMethod::Gzip;
|
||||
}
|
||||
else if (std::string::npos != http_response_compression_methods.find("deflate"))
|
||||
{
|
||||
client_supports_http_compression = true;
|
||||
http_response_compression_method = Poco::DeflatingStreamBuf::STREAM_ZLIB;
|
||||
http_response_compression_method = ZlibCompressionMethod::Zlib;
|
||||
}
|
||||
}
|
||||
|
||||
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
|
||||
response, client_supports_http_compression, http_response_compression_method);
|
||||
|
||||
/** Клиент может указать compress в query string.
|
||||
* В этом случае, результат сжимается несовместимым алгоритмом для внутреннего использования и этот факт не отражается в HTTP заголовках.
|
||||
*/
|
||||
/// Client can pass a 'compress' flag in the query string. In this case the query result is
|
||||
/// compressed using internal algorithm. This is not reflected in HTTP headers.
|
||||
if (parse<bool>(params.get("compress", "0")))
|
||||
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
|
||||
else
|
||||
used_output.out_maybe_compressed = used_output.out;
|
||||
|
||||
/// Имя пользователя и пароль могут быть заданы как в параметрах URL, так и с помощью HTTP Basic authentification (и то, и другое не секъюрно).
|
||||
/// User name and password can be passed using query parameters or using HTTP Basic auth (both methods are insecure).
|
||||
/// The user and password can be passed by headers (similar to X-Auth-*), which is used by load balancers to pass authentication information
|
||||
std::string user = request.get("X-ClickHouse-User", params.get("user", "default"));
|
||||
std::string password = request.get("X-ClickHouse-Key", params.get("password", ""));
|
||||
@ -114,43 +110,34 @@ void HTTPHandler::processQuery(
|
||||
|
||||
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);
|
||||
|
||||
/// Данные POST-а могут быть сжаты алгоритмом, указанным в Content-Encoding заголовке.
|
||||
String http_request_compression_method_str = request.get("Content-Encoding", "");
|
||||
bool http_request_decompress = false;
|
||||
Poco::InflatingStreamBuf::StreamType http_request_compression_method {};
|
||||
std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
|
||||
|
||||
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
|
||||
std::unique_ptr<ReadBuffer> in_post;
|
||||
String http_request_compression_method_str = request.get("Content-Encoding", "");
|
||||
if (!http_request_compression_method_str.empty())
|
||||
{
|
||||
ZlibCompressionMethod method;
|
||||
if (http_request_compression_method_str == "gzip")
|
||||
{
|
||||
http_request_decompress = true;
|
||||
http_request_compression_method = Poco::InflatingStreamBuf::STREAM_GZIP;
|
||||
method = ZlibCompressionMethod::Gzip;
|
||||
}
|
||||
else if (http_request_compression_method_str == "deflate")
|
||||
{
|
||||
http_request_decompress = true;
|
||||
http_request_compression_method = Poco::InflatingStreamBuf::STREAM_ZLIB;
|
||||
method = ZlibCompressionMethod::Zlib;
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str,
|
||||
ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
|
||||
}
|
||||
|
||||
std::experimental::optional<Poco::InflatingInputStream> decompressing_stream;
|
||||
std::unique_ptr<ReadBuffer> in_post;
|
||||
|
||||
if (http_request_decompress)
|
||||
{
|
||||
decompressing_stream.emplace(istr, http_request_compression_method);
|
||||
in_post = std::make_unique<ReadBufferFromIStream>(decompressing_stream.value());
|
||||
in_post = std::make_unique<ZlibInflatingReadBuffer>(*in_post_raw, method);
|
||||
}
|
||||
else
|
||||
in_post = std::make_unique<ReadBufferFromIStream>(istr);
|
||||
in_post = std::move(in_post_raw);
|
||||
|
||||
/// Также данные могут быть сжаты несовместимым алгоритмом для внутреннего использования - это определяется параметром query_string.
|
||||
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
|
||||
/// 'decompress' query parameter.
|
||||
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
|
||||
bool in_post_compressed = false;
|
||||
|
||||
if (parse<bool>(params.get("decompress", "0")))
|
||||
{
|
||||
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
|
||||
@ -161,7 +148,7 @@ void HTTPHandler::processQuery(
|
||||
|
||||
std::unique_ptr<ReadBuffer> in;
|
||||
|
||||
/// Поддержка "внешних данных для обработки запроса".
|
||||
/// Support for "external data for query processing".
|
||||
if (startsWith(request.getContentType().data(), "multipart/form-data"))
|
||||
{
|
||||
in = std::move(in_param);
|
||||
@ -169,7 +156,8 @@ void HTTPHandler::processQuery(
|
||||
|
||||
params.load(request, istr, handler);
|
||||
|
||||
/// Удаляем уже нененужные параметры из хранилища, чтобы впоследствии не перепутать их с наcтройками контекста и параметрами запроса.
|
||||
/// Erase unneeded parameters to avoid confusing them later with context settings or query
|
||||
/// parameters.
|
||||
for (const auto & it : handler.names)
|
||||
{
|
||||
params.erase(it + "_format");
|
||||
@ -180,21 +168,20 @@ void HTTPHandler::processQuery(
|
||||
else
|
||||
in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
|
||||
|
||||
/** Настройки могут быть переопределены в запросе.
|
||||
* Некоторые параметры (database, default_format, и все что использовались выше),
|
||||
* не относятся к обычным настройкам (Settings).
|
||||
*
|
||||
* Среди настроек есть также readonly.
|
||||
* readonly = 0 - можно выполнять любые запросы и изменять любые настройки
|
||||
* readonly = 1 - можно выполнять только запросы на чтение, нельзя изменять настройки
|
||||
* readonly = 2 - можно выполнять только запросы на чтение, можно изменять настройки кроме настройки readonly
|
||||
*
|
||||
* Заметим, что в запросе, если до этого readonly было равно 0,
|
||||
* пользователь может изменить любые настройки и одновременно выставить readonly в другое значение.
|
||||
*/
|
||||
/// Settings can be overridden in the query.
|
||||
/// Some parameters (database, default_format, everything used in the code above) do not
|
||||
/// belong to the Settings class.
|
||||
|
||||
/// 'readonly' setting values mean:
|
||||
/// readonly = 0 - any query is allowed, client can change any setting.
|
||||
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
|
||||
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
|
||||
|
||||
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
|
||||
/// to some other value.
|
||||
auto & limits = context.getSettingsRef().limits;
|
||||
|
||||
/// Если метод GET, то это эквивалентно настройке readonly, выставленной в ненулевое значение.
|
||||
/// Only readonly queries are allowed for HTTP GET requests.
|
||||
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
|
||||
{
|
||||
if (limits.readonly == 0)
|
||||
@ -225,7 +212,7 @@ void HTTPHandler::processQuery(
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Все остальные параметры запроса рассматриваются, как настройки.
|
||||
/// All other query parameters are treated as settings.
|
||||
|
||||
if (readonly_before_query == 1)
|
||||
throw Exception("Cannot override setting (" + it->first + ") in readonly mode", ErrorCodes::READONLY);
|
||||
@ -237,17 +224,19 @@ void HTTPHandler::processQuery(
|
||||
}
|
||||
}
|
||||
|
||||
/// Сжатие ответа (Content-Encoding) включается только если клиент сказал, что он это понимает (Accept-Encoding)
|
||||
/// и выставлена настройка, разрешающая сжатие.
|
||||
/// HTTP response compression is turned on only if the client signalled that they support it
|
||||
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
|
||||
used_output.out->setCompression(client_supports_http_compression && context.getSettingsRef().enable_http_compression);
|
||||
if (client_supports_http_compression)
|
||||
used_output.out->setCompressionLevel(context.getSettingsRef().http_zlib_compression_level);
|
||||
|
||||
/// Возможно, что выставлена настройка - не проверять чексуммы при разжатии данных от клиента, сжатых родным форматом.
|
||||
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
|
||||
/// checksums of client data compressed with internal algorithm are not checked.
|
||||
if (in_post_compressed && context.getSettingsRef().http_native_compression_disable_checksumming_on_decompress)
|
||||
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
|
||||
|
||||
/// Добавить CORS header выставлена настройка, и если клиент передал заголовок Origin
|
||||
/// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed
|
||||
/// Origin header.
|
||||
used_output.out->addHeaderCORS( context.getSettingsRef().add_http_cors_header && !request.get("Origin", "").empty() );
|
||||
|
||||
ClientInfo & client_info = context.getClientInfo();
|
||||
@ -271,7 +260,8 @@ void HTTPHandler::processQuery(
|
||||
executeQuery(*in, *used_output.out_maybe_compressed, context, query_plan,
|
||||
[&response] (const String & content_type) { response.setContentType(content_type); });
|
||||
|
||||
/// Если не было эксепшена и данные ещё не отправлены - отправляются HTTP заголовки с кодом 200.
|
||||
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
|
||||
/// the client.
|
||||
used_output.out->finalize();
|
||||
}
|
||||
|
||||
@ -282,9 +272,8 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s,
|
||||
{
|
||||
try
|
||||
{
|
||||
/** Если POST и Keep-Alive, прочитаем тело до конца.
|
||||
* Иначе вместо следующего запроса, будет прочитан кусок этого тела.
|
||||
*/
|
||||
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
|
||||
/// to avoid reading part of the current request body in the next request.
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
|
||||
&& response.getKeepAlive()
|
||||
&& !request.stream().eof())
|
||||
@ -296,15 +285,14 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s,
|
||||
|
||||
if (!response.sent() && !used_output.out_maybe_compressed)
|
||||
{
|
||||
/// Ещё ничего не отправляли, и даже не знаем, нужно ли сжимать ответ.
|
||||
/// If nothing was sent yet and we don't even know if we must compress the response.
|
||||
response.send() << s << std::endl;
|
||||
}
|
||||
else if (used_output.out_maybe_compressed)
|
||||
{
|
||||
/** Отправим в использованный (возможно сжатый) поток сообщение об ошибке.
|
||||
* Сообщение об ошибке может идти невпопад - после каких-то данных.
|
||||
* Также стоит иметь ввиду, что мы могли уже отправить код 200.
|
||||
*/
|
||||
/// Send the error message into already used (and possibly compressed) stream.
|
||||
/// Note that the error message will possibly be sent after some data.
|
||||
/// Also HTTP code 200 could have already been sent.
|
||||
|
||||
/** If buffer has data, and that data wasn't sent yet, then no need to send that data */
|
||||
if (used_output.out->count() - used_output.out->offset() == 0)
|
||||
|
@ -56,3 +56,4 @@
|
||||
Hello, world
|
||||
Hello, world
|
||||
0
|
||||
Part1 Part2
|
||||
|
@ -19,3 +19,6 @@ echo "'Hello, world'" | curl -sS --data-binary @- 'http://localhost:8123/?query=
|
||||
echo "'Hello, world'" | gzip -c | curl -sS --data-binary @- -H 'Content-Encoding: gzip' 'http://localhost:8123/?query=SELECT';
|
||||
|
||||
curl -sS 'http://localhost:8123/?enable_http_compression=1' -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 0' | wc -c;
|
||||
|
||||
# POST multiple concatenated gzip streams.
|
||||
(echo -n "SELECT 'Part1" | gzip -c; echo " Part2'" | gzip -c) | curl -sS -H 'Content-Encoding: gzip' 'http://localhost:8123/?' --data-binary @-
|
||||
|
Loading…
Reference in New Issue
Block a user