introduce zstd compression

This commit is contained in:
a.palagashvili 2020-11-17 19:02:10 +03:00
parent 7d7b19a201
commit 6f5390cc70
15 changed files with 492 additions and 138 deletions

View File

@ -330,6 +330,8 @@ dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${MINISELECT_INCLUDE_DIR})
if (ZSTD_LIBRARY)
dbms_target_link_libraries(PRIVATE ${ZSTD_LIBRARY})
target_link_libraries (clickhouse_common_io PUBLIC ${ZSTD_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${ZSTD_INCLUDE_DIR})
if (NOT USE_INTERNAL_ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
dbms_target_include_directories(SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()

View File

@ -523,6 +523,8 @@
M(554, LZMA_STREAM_DECODER_FAILED) \
M(555, ROCKSDB_ERROR) \
M(556, SYNC_MYSQL_USER_ACCESS_ERROR)\
M(557, ZSTD_ENCODER_FAILED) \
M(558, ZSTD_DECODER_FAILED) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -8,6 +8,8 @@
#include <IO/WriteBuffer.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <IO/ZstdInflatingReadBuffer.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -34,6 +36,8 @@ std::string toContentEncodingName(CompressionMethod method)
return "br";
case CompressionMethod::Xz:
return "xz";
case CompressionMethod::Zstd:
return "zstd";
case CompressionMethod::None:
return "";
}
@ -61,11 +65,13 @@ CompressionMethod chooseCompressionMethod(const std::string & path, const std::s
return CompressionMethod::Brotli;
if (*method_str == "LZMA" || *method_str == "xz")
return CompressionMethod::Xz;
if (*method_str == "zstd" || *method_str == "zst")
return CompressionMethod::Zstd;
if (hint.empty() || hint == "auto" || hint == "none")
return CompressionMethod::None;
throw Exception(
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz' are supported as compression methods",
"Unknown compression method " + hint + ". Only 'auto', 'none', 'gzip', 'br', 'xz', 'zst' are supported as compression methods",
ErrorCodes::NOT_IMPLEMENTED);
}
@ -81,6 +87,8 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMAInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdInflatingReadBuffer>(std::move(nested), buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;
@ -102,6 +110,9 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
if (method == CompressionMethod::None)
return nested;

View File

@ -28,6 +28,9 @@ enum class CompressionMethod
/// LZMA2-based content compression
/// This option corresponds to HTTP Content-Encoding: xz
Xz,
/// Zstd compressor
/// This option corresponds to HTTP Content-Encoding: zstd
Zstd,
Brotli
};

View File

@ -0,0 +1,92 @@
#include <IO/ZstdDeflatingWriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_ENCODER_FAILED;
}
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
}
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{
try
{
finish();
ZSTD_freeCCtx(cctx);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ZstdDeflatingWriteBuffer::nextImpl()
{
if (!offset())
return;
bool last_chunk = hasPendingData();
ZSTD_EndDirective mode = last_chunk ? ZSTD_e_end : ZSTD_e_flush;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
bool finished = false;
do
{
out->nextIfAtEnd();
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, mode);
out->position() = out->buffer().begin() + output.pos;
finished = last_chunk ? (remaining == 0) : (input.pos == input.size);
} while (!finished);
if (last_chunk)
flushed = true;
}
void ZstdDeflatingWriteBuffer::finish()
{
if (flushed)
return;
next();
out->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
output.dst = reinterpret_cast<unsigned char *>(out->buffer().begin());
output.size = out->buffer().size();
output.pos = out->offset();
size_t remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: zstd version: {}", ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;
flushed = true;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <zstd.h>
namespace DB
{
/// Performs compression using zlib library and writes compressed data to out_ WriteBuffer.
class ZstdDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
/// Flush all pending data and write zlib footer to the underlying buffer.
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finish();
~ZstdDeflatingWriteBuffer() override;
private:
void nextImpl() override;
std::unique_ptr<WriteBuffer> out;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
bool flushed = false;
};
}

View File

@ -0,0 +1,64 @@
#include <IO/ZstdInflatingReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_DECODER_FAILED;
}
ZstdInflatingReadBuffer::ZstdInflatingReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment), in(std::move(in_))
{
dctx = ZSTD_createDCtx();
input = {nullptr, 0, 0};
output = {nullptr, 0, 0};
if (dctx == nullptr)
{
throw Exception(ErrorCodes::ZSTD_DECODER_FAILED, "zstd_stream_decoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
}
}
ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
{
ZSTD_freeDCtx(dctx);
}
bool ZstdInflatingReadBuffer::nextImpl()
{
if (eof)
return false;
if (input.pos >= input.size)
{
in->nextIfAtEnd();
input.src = reinterpret_cast<unsigned char *>(in->position());
input.pos = 0;
input.size = in->buffer().end() - in->position();
}
output.dst = reinterpret_cast<unsigned char *>(internal_buffer.begin());
output.size = internal_buffer.size();
output.pos = 0;
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(ret))
throw Exception(
ErrorCodes::ZSTD_DECODER_FAILED, "zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
in->position() = in->buffer().begin() + input.pos;
working_buffer.resize(output.pos);
if (in->eof())
{
eof = true;
return working_buffer.size() != 0;
}
return true;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <zstd.h>
namespace DB
{
namespace ErrorCodes
{
}
class ZstdInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
ZstdInflatingReadBuffer(
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~ZstdInflatingReadBuffer() override;
private:
bool nextImpl() override;
std::unique_ptr<ReadBuffer> in;
ZSTD_DCtx * dctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
bool eof = false;
};
}

View File

@ -82,3 +82,6 @@ target_link_libraries (zlib_ng_bug PRIVATE ${ZLIB_LIBRARIES})
add_executable (ryu_test ryu_test.cpp)
target_link_libraries (ryu_test PRIVATE ryu)
add_executable (zstd_buffers zstd_buffers.cpp)
target_link_libraries (zstd_buffers PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,66 @@
#include <iomanip>
#include <iostream>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <IO/ZstdInflatingReadBuffer.h>
#include <Common/Stopwatch.h>
int main(int, char **)
try
{
std::cout << std::fixed << std::setprecision(2);
size_t n = 10000000;
Stopwatch stopwatch;
{
auto buf
= std::make_unique<DB::WriteBufferFromFile>("test_zstd_buffers.zst", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZstdDeflatingWriteBuffer zstd_buf(std::move(buf), /*compression level*/ 3);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, zstd_buf);
DB::writeChar('\t', zstd_buf);
}
zstd_buf.finish();
stopwatch.stop();
std::cout << "Writing done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (zstd_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
{
auto buf = std::make_unique<DB::ReadBufferFromFile>("test_zstd_buffers.zst");
DB::ZstdInflatingReadBuffer zstd_buf(std::move(buf));
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
size_t x;
DB::readIntText(x, zstd_buf);
zstd_buf.ignore();
if (x != i)
throw DB::Exception("Failed!, read: " + std::to_string(x) + ", expected: " + std::to_string(i), 0);
}
stopwatch.stop();
std::cout << "Reading done. Elapsed: " << stopwatch.elapsedSeconds() << " s."
<< ", " << (zstd_buf.count() / stopwatch.elapsedSeconds() / 1000000) << " MB/s" << std::endl;
}
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}

View File

@ -5,39 +5,39 @@
#include <chrono>
#include <iomanip>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/ExternalTable.h>
#include <DataStreams/IBlockInputStream.h>
#include <Disks/StoragePolicy.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/executeQuery.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
#include <ext/scope_guard.h>
#include <Core/ExternalTable.h>
#include <Common/SettingsChanges.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <common/getFQDNOrHostName.h>
#include <Common/setThreadName.h>
#include <Common/SettingsChanges.h>
#include <Disks/StoragePolicy.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Common/typeid_cast.h>
#include <Poco/Net/HTTPStream.h>
#include <common/getFQDNOrHostName.h>
#include <ext/scope_guard.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -46,10 +46,8 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_PARSE_TEXT;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
@ -109,36 +107,25 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
{
return HTTPResponse::HTTP_UNAUTHORIZED;
}
else if (exception_code == ErrorCodes::CANNOT_PARSE_TEXT ||
exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE ||
exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING ||
exception_code == ErrorCodes::CANNOT_PARSE_DATE ||
exception_code == ErrorCodes::CANNOT_PARSE_DATETIME ||
exception_code == ErrorCodes::CANNOT_PARSE_NUMBER ||
exception_code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED ||
exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST ||
exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE ||
exception_code == ErrorCodes::TOO_DEEP_AST ||
exception_code == ErrorCodes::TOO_BIG_AST ||
exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE ||
exception_code == ErrorCodes::SYNTAX_ERROR ||
exception_code == ErrorCodes::INCORRECT_DATA ||
exception_code == ErrorCodes::TYPE_MISMATCH)
else if (
exception_code == ErrorCodes::CANNOT_PARSE_TEXT || exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE
|| exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || exception_code == ErrorCodes::CANNOT_PARSE_DATE
|| exception_code == ErrorCodes::CANNOT_PARSE_DATETIME || exception_code == ErrorCodes::CANNOT_PARSE_NUMBER
|| exception_code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED || exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST
|| exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE || exception_code == ErrorCodes::TOO_DEEP_AST
|| exception_code == ErrorCodes::TOO_BIG_AST || exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE
|| exception_code == ErrorCodes::SYNTAX_ERROR || exception_code == ErrorCodes::INCORRECT_DATA
|| exception_code == ErrorCodes::TYPE_MISMATCH)
{
return HTTPResponse::HTTP_BAD_REQUEST;
}
else if (exception_code == ErrorCodes::UNKNOWN_TABLE ||
exception_code == ErrorCodes::UNKNOWN_FUNCTION ||
exception_code == ErrorCodes::UNKNOWN_IDENTIFIER ||
exception_code == ErrorCodes::UNKNOWN_TYPE ||
exception_code == ErrorCodes::UNKNOWN_STORAGE ||
exception_code == ErrorCodes::UNKNOWN_DATABASE ||
exception_code == ErrorCodes::UNKNOWN_SETTING ||
exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING ||
exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION ||
exception_code == ErrorCodes::UNKNOWN_FORMAT ||
exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE ||
exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY)
else if (
exception_code == ErrorCodes::UNKNOWN_TABLE || exception_code == ErrorCodes::UNKNOWN_FUNCTION
|| exception_code == ErrorCodes::UNKNOWN_IDENTIFIER || exception_code == ErrorCodes::UNKNOWN_TYPE
|| exception_code == ErrorCodes::UNKNOWN_STORAGE || exception_code == ErrorCodes::UNKNOWN_DATABASE
|| exception_code == ErrorCodes::UNKNOWN_SETTING || exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING
|| exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION || exception_code == ErrorCodes::UNKNOWN_FORMAT
|| exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE || exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY)
{
return HTTPResponse::HTTP_NOT_FOUND;
}
@ -150,8 +137,7 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
{
return HTTPResponse::HTTP_NOT_IMPLEMENTED;
}
else if (exception_code == ErrorCodes::SOCKET_TIMEOUT ||
exception_code == ErrorCodes::CANNOT_OPEN_FILE)
else if (exception_code == ErrorCodes::SOCKET_TIMEOUT || exception_code == ErrorCodes::CANNOT_OPEN_FILE)
{
return HTTPResponse::HTTP_SERVICE_UNAVAILABLE;
}
@ -164,9 +150,7 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
}
static std::chrono::steady_clock::duration parseSessionTimeout(
const Poco::Util::AbstractConfiguration & config,
const HTMLForm & params)
static std::chrono::steady_clock::duration parseSessionTimeout(const Poco::Util::AbstractConfiguration & config, const HTMLForm & params)
{
unsigned session_timeout = config.getInt("default_session_timeout", 60);
@ -180,8 +164,9 @@ static std::chrono::steady_clock::duration parseSessionTimeout(
throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT);
if (session_timeout > max_session_timeout)
throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout)
+ ". Maximum session timeout could be modified in configuration file.",
throw Exception(
"Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout)
+ ". Maximum session timeout could be modified in configuration file.",
ErrorCodes::INVALID_SESSION_TIMEOUT);
}
@ -209,8 +194,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
IReadableWriteBuffer * write_buf_concrete;
ReadBufferPtr reread_buf;
if (write_buf
&& (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
if (write_buf && (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
&& (reread_buf = write_buf_concrete->tryGetReadBuffer()))
{
read_buffers.emplace_back(reread_buf);
@ -223,9 +207,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
}
HTTPHandler::HTTPHandler(IServer & server_, const std::string & name)
: server(server_)
, log(&Poco::Logger::get(name))
HTTPHandler::HTTPHandler(IServer & server_, const std::string & name) : server(server_), log(&Poco::Logger::get(name))
{
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
}
@ -271,12 +253,12 @@ void HTTPHandler::processQuery(
else
{
/// It is prohibited to mix different authorization schemes.
if (request.hasCredentials()
|| params.has("user")
|| params.has("password")
|| params.has("quota_key"))
if (request.hasCredentials() || params.has("user") || params.has("password") || params.has("quota_key"))
{
throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD);
throw Exception(
"Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods "
"simultaneously",
ErrorCodes::REQUIRED_PASSWORD);
}
}
@ -318,12 +300,13 @@ void HTTPHandler::processQuery(
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
opentelemetry_traceparent, error))
if (!context.getClientInfo().parseTraceparentHeader(opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
throw Exception(
ErrorCodes::BAD_REQUEST_PARAMETER,
"Failed to parse OpenTelemetry traceparent header '{}': {}",
opentelemetry_traceparent, error);
opentelemetry_traceparent,
error);
}
context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", "");
@ -332,8 +315,7 @@ void HTTPHandler::processQuery(
// Set the query id supplied by the user, if any, and also update the
// OpenTelemetry fields.
context.setCurrentQueryId(params.get("query_id",
request.get("X-ClickHouse-Query-Id", "")));
context.setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
@ -353,6 +335,8 @@ void HTTPHandler::processQuery(
http_response_compression_method = CompressionMethod::Zlib;
else if (std::string::npos != http_response_compression_methods.find("xz"))
http_response_compression_method = CompressionMethod::Xz;
else if (std::string::npos != http_response_compression_methods.find("zstd"))
http_response_compression_method = CompressionMethod::Zstd;
}
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;
@ -362,8 +346,8 @@ void HTTPHandler::processQuery(
bool internal_compression = params.getParsed<bool>("compress", false);
/// At least, we should postpone sending of first buffer_size result bytes
size_t buffer_size_total = std::max(
params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
size_t buffer_size_total
= std::max(params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
@ -395,23 +379,20 @@ void HTTPHandler::processQuery(
const std::string tmp_path(context.getTemporaryVolume()->getDisk()->getPath());
const std::string tmp_path_template(tmp_path + "http_buffers/");
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
{
return WriteBufferFromTemporaryFile::create(tmp_path_template);
};
auto create_tmp_disk_buffer
= [tmp_path_template](const WriteBufferPtr &) { return WriteBufferFromTemporaryFile::create(tmp_path_template); };
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
}
else
{
auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf)
{
auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed](const WriteBufferPtr & prev_buf) {
auto * prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!prev_memory_buffer)
throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR);
auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
copyData(*rdbuf , *next_buffer);
copyData(*rdbuf, *next_buffer);
return next_buffer;
};
@ -419,8 +400,8 @@ void HTTPHandler::processQuery(
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
}
used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>(
std::move(cascade_buffer1), std::move(cascade_buffer2));
used_output.out_maybe_delayed_and_compressed
= std::make_shared<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
}
else
{
@ -446,13 +427,23 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in;
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"};
static const NameSet reserved_param_names{
"compress",
"decompress",
"user",
"password",
"quota_key",
"query_id",
"stacktrace",
"buffer_size",
"wait_end_of_query",
"session_id",
"session_timeout",
"session_check"};
Names reserved_param_suffixes;
auto param_could_be_skipped = [&] (const String & name)
{
auto param_could_be_skipped = [&](const String & name) {
/// Empty parameter appears when URL like ?&a=b or a=b&&c=d. Just skip them for user's convenience.
if (name.empty())
return true;
@ -577,12 +568,10 @@ void HTTPHandler::processQuery(
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
auto append_callback = [&context] (ProgressCallback callback)
{
auto append_callback = [&context](ProgressCallback callback) {
auto prev = context.getProgressCallback();
context.setProgressCallback([prev, callback] (const Progress & progress)
{
context.setProgressCallback([prev, callback](const Progress & progress) {
if (prev)
prev(progress);
@ -592,14 +581,13 @@ void HTTPHandler::processQuery(
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if (settings.send_progress_in_http_headers)
append_callback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
append_callback([&used_output](const Progress & progress) { used_output.out->onProgress(progress); });
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
append_callback([&context, &socket](const Progress &)
{
append_callback([&context, &socket](const Progress &) {
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
try
@ -623,15 +611,17 @@ void HTTPHandler::processQuery(
query_scope.emplace(context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
{
executeQuery(
*in,
*used_output.out_maybe_delayed_and_compressed,
/* allow_into_outfile = */ false,
context,
[&response](const String & current_query_id, const String & content_type, const String & format, const String & timezone) {
response.setContentType(content_type);
response.add("X-ClickHouse-Query-Id", current_query_id);
response.add("X-ClickHouse-Format", format);
response.add("X-ClickHouse-Timezone", timezone);
}
);
});
if (used_output.hasDelayed())
{
@ -644,8 +634,11 @@ void HTTPHandler::processQuery(
used_output.out->finalize();
}
void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
void HTTPHandler::trySendExceptionToClient(
const std::string & s,
int exception_code,
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response,
Output & used_output)
{
try
@ -654,17 +647,14 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive()
&& !request.stream().eof()
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && response.getKeepAlive() && !request.stream().eof()
&& exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
{
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
}
bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER ||
exception_code == ErrorCodes::WRONG_PASSWORD ||
exception_code == ErrorCodes::REQUIRED_PASSWORD;
bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD
|| exception_code == ErrorCodes::REQUIRED_PASSWORD;
if (auth_fail)
{
@ -742,10 +732,12 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
with_stacktrace = params.getParsed<bool>("stacktrace", false);
/// Workaround. Poco does not detect 411 Length Required case.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() &&
!request.hasContentLength())
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding()
&& !request.hasContentLength())
{
throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED);
throw Exception(
"The Transfer-Encoding is not chunked and there is no Content-Length header for POST request",
ErrorCodes::HTTP_LENGTH_REQUIRED);
}
processQuery(context, request, params, response, used_output, query_scope);
@ -773,7 +765,7 @@ DynamicQueryHandler::DynamicQueryHandler(IServer & server_, const std::string &
bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::string & key, const std::string & value)
{
if (key == param_name)
return true; /// do nothing
return true; /// do nothing
if (startsWith(key, "param_"))
{
@ -788,7 +780,6 @@ bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::stri
std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (likely(!startsWith(request.getContentType(), "multipart/form-data")))
{
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
@ -813,10 +804,16 @@ std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request
}
PredefinedQueryHandler::PredefinedQueryHandler(
IServer & server_, const NameSet & receive_params_, const std::string & predefined_query_
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_)
: HTTPHandler(server_, "PredefinedQueryHandler"), receive_params(receive_params_), predefined_query(predefined_query_)
, url_regex(url_regex_), header_name_with_capture_regex(header_name_with_regex_)
IServer & server_,
const NameSet & receive_params_,
const std::string & predefined_query_,
const CompiledRegexPtr & url_regex_,
const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_)
: HTTPHandler(server_, "PredefinedQueryHandler")
, receive_params(receive_params_)
, predefined_query(predefined_query_)
, url_regex(url_regex_)
, header_name_with_capture_regex(header_name_with_regex_)
{
}
@ -836,8 +833,7 @@ void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & req
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
const auto & set_query_params = [&](const char * begin, const char * end, const CompiledRegexPtr & compiled_regex)
{
const auto & set_query_params = [&](const char * begin, const char * end, const CompiledRegexPtr & compiled_regex) {
int num_captures = compiled_regex->NumberOfCapturingGroups() + 1;
re2::StringPiece matches[num_captures];
@ -882,16 +878,16 @@ std::string PredefinedQueryHandler::getQuery(Poco::Net::HTTPServerRequest & requ
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
{
std::string query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, std::move(query_param_name)), server.config(), config_prefix);
return addFiltersFromConfig(
new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, std::move(query_param_name)), server.config(), config_prefix);
}
static inline bool capturingNamedQueryParam(NameSet receive_params, const CompiledRegexPtr & compiled_regex)
{
const auto & capturing_names = compiled_regex->NamedCapturingGroups();
return std::count_if(capturing_names.begin(), capturing_names.end(), [&](const auto & iterator)
{
return std::count_if(receive_params.begin(), receive_params.end(),
[&](const auto & param_name) { return param_name == iterator.first; });
return std::count_if(capturing_names.begin(), capturing_names.end(), [&](const auto & iterator) {
return std::count_if(
receive_params.begin(), receive_params.end(), [&](const auto & param_name) { return param_name == iterator.first; });
});
}
@ -900,8 +896,10 @@ static inline CompiledRegexPtr getCompiledRegex(const std::string & expression)
auto compiled_regex = std::make_shared<const re2::RE2>(expression);
if (!compiled_regex->ok())
throw Exception("Cannot compile re2: " + expression + " for http handling rule, error: " +
compiled_regex->error() + ". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
throw Exception(
"Cannot compile re2: " + expression + " for http handling rule, error: " + compiled_regex->error()
+ ". Look at https://github.com/google/re2/wiki/Syntax for reference.",
ErrorCodes::CANNOT_COMPILE_REGEXP);
return compiled_regex;
}
@ -911,7 +909,8 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer &
Poco::Util::AbstractConfiguration & configuration = server.config();
if (!configuration.has(config_prefix + ".handler.query"))
throw Exception("There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
throw Exception(
"There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
std::string predefined_query = configuration.getString(config_prefix + ".handler.query");
NameSet analyze_receive_params = analyzeReceiveQueryParams(predefined_query);
@ -942,14 +941,22 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer &
auto regex = getCompiledRegex(url_expression);
if (capturingNamedQueryParam(analyze_receive_params, regex))
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), std::move(regex),
std::move(headers_name_with_regex)), configuration, config_prefix);
return addFiltersFromConfig(
new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server,
std::move(analyze_receive_params),
std::move(predefined_query),
std::move(regex),
std::move(headers_name_with_regex)),
configuration,
config_prefix);
}
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{} ,std::move(headers_name_with_regex)),
configuration, config_prefix);
return addFiltersFromConfig(
new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{}, std::move(headers_name_with_regex)),
configuration,
config_prefix);
}
}

View File

@ -68,15 +68,28 @@
7
8
9
0
1
2
3
4
5
6
7
8
9
< Content-Encoding: gzip
< Content-Encoding: deflate
< Content-Encoding: gzip
< Content-Encoding: br
< Content-Encoding: xz
< Content-Encoding: zstd
1
1
1
1
1
Hello, world
Hello, world
Hello, world
Hello, world

View File

@ -10,6 +10,7 @@ ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10';
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' | brotli -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' | xz -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' | zstd -d;
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
@ -18,16 +19,19 @@ ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
echo "SELECT 1" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}";
echo "SELECT 1" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}";
echo "SELECT 1" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}";
echo "SELECT 1" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}";
echo "SELECT 1" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}";
echo "'Hello, world'" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}&query=SELECT";
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 0' | wc -c;

View File

@ -1,7 +1,9 @@
1000000 999999
1000000 999999
1000000 999999
3000000 999999
1000000 999999
4000000 999999
1 255
1 255
1 255
1 255

View File

@ -23,9 +23,18 @@ SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2,3}.tsv.{gz,br,xz}', TSV, 'x UInt64');
CREATE TABLE file (x UInt64) ENGINE = File(TSV, 'data4.tsv.zst');
TRUNCATE TABLE file;
INSERT INTO file SELECT * FROM numbers(1000000);
SELECT count(), max(x) FROM file;
DROP TABLE file;
SELECT count(), max(x) FROM file('data{1,2,3,4}.tsv.{gz,br,xz,zst}', TSV, 'x UInt64');
-- check that they are compressed
SELECT count() < 1000000, max(x) FROM file('data1.tsv.br', RowBinary, 'x UInt8', 'none');
SELECT count() < 3000000, max(x) FROM file('data2.tsv.gz', RowBinary, 'x UInt8', 'none');
SELECT count() < 1000000, max(x) FROM file('data3.tsv.xz', RowBinary, 'x UInt8', 'none');
SELECT count() < 1000000, max(x) FROM file('data4.tsv.zst', RowBinary, 'x UInt8', 'none');