Rename QueryInfo's field result_compression -> transport_compression_type and change its type for better consistency.

Make the previous field obsolete.
This commit is contained in:
Vitaly Baranov 2022-02-07 01:33:31 +07:00
parent 1f6b65d39d
commit 1341b4b4de
4 changed files with 142 additions and 101 deletions

View File

@ -217,13 +217,12 @@
<!-- The following file is used only if ssl_require_client_auth=1 -->
<ssl_ca_cert_file>/path/to/ssl_ca_cert_file</ssl_ca_cert_file>
<!-- Default compression algorithm (applied if client doesn't specify another algorithm, see result_compression in QueryInfo).
<!-- Default transport compression type (can be overridden by client, see the transport_compression_type field in QueryInfo).
Supported algorithms: none, deflate, gzip, stream_gzip -->
<compression>deflate</compression>
<transport_compression_type>none</transport_compression_type>
<!-- Default compression level (applied if client doesn't specify another level, see result_compression in QueryInfo).
Supported levels: none, low, medium, high -->
<compression_level>medium</compression_level>
<!-- Default transport compression level. Supported levels: 0..3 -->
<transport_compression_level>0</transport_compression_level>
<!-- Send/receive message size limits in bytes. -1 means unlimited -->
<max_send_message_size>-1</max_send_message_size>

View File

@ -51,6 +51,7 @@ using GRPCQueryInfo = clickhouse::grpc::QueryInfo;
using GRPCResult = clickhouse::grpc::Result;
using GRPCException = clickhouse::grpc::Exception;
using GRPCProgress = clickhouse::grpc::Progress;
using GRPCObsoleteTransportCompression = clickhouse::grpc::ObsoleteTransportCompression;
namespace DB
{
@ -101,62 +102,6 @@ namespace
});
}
grpc_compression_algorithm parseCompressionAlgorithm(const String & str)
{
if (str == "none")
return GRPC_COMPRESS_NONE;
else if (str == "deflate")
return GRPC_COMPRESS_DEFLATE;
else if (str == "gzip")
return GRPC_COMPRESS_GZIP;
else if (str == "stream_gzip")
return GRPC_COMPRESS_STREAM_GZIP;
else
throw Exception("Unknown compression algorithm: '" + str + "'", ErrorCodes::INVALID_CONFIG_PARAMETER);
}
grpc_compression_level parseCompressionLevel(const String & str)
{
if (str == "none")
return GRPC_COMPRESS_LEVEL_NONE;
else if (str == "low")
return GRPC_COMPRESS_LEVEL_LOW;
else if (str == "medium")
return GRPC_COMPRESS_LEVEL_MED;
else if (str == "high")
return GRPC_COMPRESS_LEVEL_HIGH;
else
throw Exception("Unknown compression level: '" + str + "'", ErrorCodes::INVALID_CONFIG_PARAMETER);
}
grpc_compression_algorithm convertCompressionAlgorithm(const ::clickhouse::grpc::CompressionAlgorithm & algorithm)
{
if (algorithm == ::clickhouse::grpc::NO_COMPRESSION)
return GRPC_COMPRESS_NONE;
else if (algorithm == ::clickhouse::grpc::DEFLATE)
return GRPC_COMPRESS_DEFLATE;
else if (algorithm == ::clickhouse::grpc::GZIP)
return GRPC_COMPRESS_GZIP;
else if (algorithm == ::clickhouse::grpc::STREAM_GZIP)
return GRPC_COMPRESS_STREAM_GZIP;
else
throw Exception("Unknown compression algorithm: '" + ::clickhouse::grpc::CompressionAlgorithm_Name(algorithm) + "'", ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
grpc_compression_level convertCompressionLevel(const ::clickhouse::grpc::CompressionLevel & level)
{
if (level == ::clickhouse::grpc::COMPRESSION_NONE)
return GRPC_COMPRESS_LEVEL_NONE;
else if (level == ::clickhouse::grpc::COMPRESSION_LOW)
return GRPC_COMPRESS_LEVEL_LOW;
else if (level == ::clickhouse::grpc::COMPRESSION_MEDIUM)
return GRPC_COMPRESS_LEVEL_MED;
else if (level == ::clickhouse::grpc::COMPRESSION_HIGH)
return GRPC_COMPRESS_LEVEL_HIGH;
else
throw Exception("Unknown compression level: '" + ::clickhouse::grpc::CompressionLevel_Name(level) + "'", ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
/// Gets file's contents as a string, throws an exception if failed.
String readFile(const String & filepath)
{
@ -193,6 +138,102 @@ namespace
return grpc::InsecureServerCredentials();
}
/// Transport compression makes gRPC library to compress packed Result messages before sending them through network.
struct TransportCompression
{
grpc_compression_algorithm algorithm;
grpc_compression_level level;
/// Extracts the settings of transport compression from a query info if possible.
static std::optional<TransportCompression> fromQueryInfo(const GRPCQueryInfo & query_info)
{
TransportCompression res;
if (!query_info.transport_compression_type().empty())
{
res.setAlgorithm(query_info.transport_compression_type(), ErrorCodes::INVALID_GRPC_QUERY_INFO);
res.setLevel(query_info.transport_compression_level(), ErrorCodes::INVALID_GRPC_QUERY_INFO);
return res;
}
if (query_info.has_obsolete_result_compression())
{
switch (query_info.obsolete_result_compression().algorithm())
{
case GRPCObsoleteTransportCompression::NO_COMPRESSION: res.algorithm = GRPC_COMPRESS_NONE; break;
case GRPCObsoleteTransportCompression::DEFLATE: res.algorithm = GRPC_COMPRESS_DEFLATE; break;
case GRPCObsoleteTransportCompression::GZIP: res.algorithm = GRPC_COMPRESS_GZIP; break;
case GRPCObsoleteTransportCompression::STREAM_GZIP: res.algorithm = GRPC_COMPRESS_STREAM_GZIP; break;
default: throw Exception(ErrorCodes::INVALID_GRPC_QUERY_INFO, "Unknown compression algorithm: {}", GRPCObsoleteTransportCompression::CompressionAlgorithm_Name(query_info.obsolete_result_compression().algorithm()));
}
switch (query_info.obsolete_result_compression().level())
{
case GRPCObsoleteTransportCompression::COMPRESSION_NONE: res.level = GRPC_COMPRESS_LEVEL_NONE; break;
case GRPCObsoleteTransportCompression::COMPRESSION_LOW: res.level = GRPC_COMPRESS_LEVEL_LOW; break;
case GRPCObsoleteTransportCompression::COMPRESSION_MEDIUM: res.level = GRPC_COMPRESS_LEVEL_MED; break;
case GRPCObsoleteTransportCompression::COMPRESSION_HIGH: res.level = GRPC_COMPRESS_LEVEL_HIGH; break;
default: throw Exception(ErrorCodes::INVALID_GRPC_QUERY_INFO, "Unknown compression level: {}", GRPCObsoleteTransportCompression::CompressionLevel_Name(query_info.obsolete_result_compression().level()));
}
return res;
}
return std::nullopt;
}
/// Extracts the settings of transport compression from the server configuration.
static TransportCompression fromConfiguration(const Poco::Util::AbstractConfiguration & config)
{
TransportCompression res;
if (config.has("grpc.transport_compression_type"))
{
res.setAlgorithm(config.getString("grpc.transport_compression_type"), ErrorCodes::INVALID_CONFIG_PARAMETER);
res.setLevel(config.getInt("grpc.transport_compression_level", 0), ErrorCodes::INVALID_CONFIG_PARAMETER);
}
else
{
res.setAlgorithm(config.getString("grpc.compression", "none"), ErrorCodes::INVALID_CONFIG_PARAMETER);
res.setLevel(config.getString("grpc.compression_level", "none"), ErrorCodes::INVALID_CONFIG_PARAMETER);
}
return res;
}
private:
void setAlgorithm(const String & str, int error_code)
{
if (str == "none")
algorithm = GRPC_COMPRESS_NONE;
else if (str == "deflate")
algorithm = GRPC_COMPRESS_DEFLATE;
else if (str == "gzip")
algorithm = GRPC_COMPRESS_GZIP;
else if (str == "stream_gzip")
algorithm = GRPC_COMPRESS_STREAM_GZIP;
else
throw Exception(error_code, "Unknown compression algorithm: '{}'", str);
}
void setLevel(const String & str, int error_code)
{
if (str == "none")
level = GRPC_COMPRESS_LEVEL_NONE;
else if (str == "low")
level = GRPC_COMPRESS_LEVEL_LOW;
else if (str == "medium")
level = GRPC_COMPRESS_LEVEL_MED;
else if (str == "high")
level = GRPC_COMPRESS_LEVEL_HIGH;
else
throw Exception(error_code, "Unknown compression level: '{}'", str);
}
void setLevel(int level_, int error_code)
{
if (0 <= level_ && level_ < GRPC_COMPRESS_LEVEL_COUNT)
level = static_cast<grpc_compression_level>(level_);
else
throw Exception(error_code, "Compression level {} is out of range 0..{}", level_, GRPC_COMPRESS_LEVEL_COUNT - 1);
}
};
/// Gets session's timeout from query info or from the server config.
std::chrono::steady_clock::duration getSessionTimeout(const GRPCQueryInfo & query_info, const Poco::Util::AbstractConfiguration & config)
@ -293,15 +334,10 @@ namespace
return std::nullopt;
}
void setResultCompression(grpc_compression_algorithm algorithm, grpc_compression_level level)
void setTransportCompression(const TransportCompression & transport_compression)
{
grpc_context.set_compression_algorithm(algorithm);
grpc_context.set_compression_level(level);
}
void setResultCompression(const ::clickhouse::grpc::Compression & compression)
{
setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level()));
grpc_context.set_compression_algorithm(transport_compression.algorithm);
grpc_context.set_compression_level(transport_compression.level);
}
protected:
@ -816,9 +852,9 @@ namespace
if (!query_info.database().empty())
query_context->setCurrentDatabase(query_info.database());
/// Apply compression settings for this call.
if (query_info.has_result_compression())
responder->setResultCompression(query_info.result_compression());
/// Apply transport compression for this call.
if (auto transport_compression = TransportCompression::fromQueryInfo(query_info))
responder->setTransportCompression(*transport_compression);
/// The interactive delay will be used to show progress.
interactive_delay = settings.interactive_delay;
@ -1781,8 +1817,9 @@ void GRPCServer::start()
builder.RegisterService(&grpc_service);
builder.SetMaxSendMessageSize(iserver.config().getInt("grpc.max_send_message_size", -1));
builder.SetMaxReceiveMessageSize(iserver.config().getInt("grpc.max_receive_message_size", -1));
builder.SetDefaultCompressionAlgorithm(parseCompressionAlgorithm(iserver.config().getString("grpc.compression", "none")));
builder.SetDefaultCompressionLevel(parseCompressionLevel(iserver.config().getString("grpc.compression_level", "none")));
auto default_transport_compression = TransportCompression::fromConfiguration(iserver.config());
builder.SetDefaultCompressionAlgorithm(default_transport_compression.algorithm);
builder.SetDefaultCompressionLevel(default_transport_compression.level);
queue = builder.AddCompletionQueue();
grpc_server = builder.BuildAndStart();

View File

@ -45,21 +45,19 @@ message ExternalTable {
map<string, string> settings = 5;
}
enum CompressionAlgorithm {
NO_COMPRESSION = 0;
DEFLATE = 1;
GZIP = 2;
STREAM_GZIP = 3;
}
enum CompressionLevel {
COMPRESSION_NONE = 0;
COMPRESSION_LOW = 1;
COMPRESSION_MEDIUM = 2;
COMPRESSION_HIGH = 3;
}
message Compression {
message ObsoleteTransportCompression {
enum CompressionAlgorithm {
NO_COMPRESSION = 0;
DEFLATE = 1;
GZIP = 2;
STREAM_GZIP = 3;
}
enum CompressionLevel {
COMPRESSION_NONE = 0;
COMPRESSION_LOW = 1;
COMPRESSION_MEDIUM = 2;
COMPRESSION_HIGH = 3;
}
CompressionAlgorithm algorithm = 1;
CompressionLevel level = 2;
}
@ -102,10 +100,6 @@ message QueryInfo {
// `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used.
bool next_query_info = 16;
/// Controls how a ClickHouse server will compress query execution results before sending back to the client.
/// If not set the compression settings from the configuration file will be used.
Compression result_compression = 17;
// Compression type for `input_data`.
// Supported compression types: none, gzip(gz), deflate, brotli(br), lzma(xz), zstd(zst), lz4, bz2.
// The client is responsible to compress data before putting it into `input_data`.
@ -129,7 +123,20 @@ message QueryInfo {
// bz2: 1..9
int32 output_compression_level = 19;
// Transport compression is an alternative way to make the server to compress its response.
// This kind of compression implies that instead of compressing just `output` the server will compress whole packed messages of the `Result` type,
// and then gRPC implementation on client side will decompress those messages so client code won't be bothered with decompression.
// Here is a big difference between the transport compression and the compression enabled by setting `output_compression_type` because
// in case of the transport compression the client code receives already decompressed data in `output`.
// If the transport compression is not set here it can still be enabled by the server configuration.
// Supported compression types: none, deflate, gzip, stream_gzip
// Supported compression levels: 0..3
// WARNING: Don't set `transport_compression` and `output_compression` at the same time because it will make the server to compress its output twice!
string transport_compression_type = 22;
int32 transport_compression_level = 23;
/// Obsolete fields, should not be used in new code.
ObsoleteTransportCompression obsolete_result_compression = 17;
string obsolete_compression_type = 18;
}

View File

@ -373,14 +373,6 @@ def test_cancel_while_generating_output():
output += result.output
assert output == b'0\t0\n1\t0\n2\t0\n3\t0\n'
def test_result_compression():
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 0 FROM numbers(1000000)",
result_compression=clickhouse_grpc_pb2.Compression(algorithm=clickhouse_grpc_pb2.CompressionAlgorithm.GZIP,
level=clickhouse_grpc_pb2.CompressionLevel.COMPRESSION_HIGH))
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQuery(query_info)
assert result.output == (b'0\n')*1000000
def test_compressed_output():
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 0 FROM numbers(1000)", output_compression_type="lz4")
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
@ -445,6 +437,12 @@ def test_compressed_external_table():
b"4\tDaniel\n"\
b"5\tEthan\n"
def test_transport_compression():
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 0 FROM numbers(1000000)", transport_compression_type='gzip', transport_compression_level=3)
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQuery(query_info)
assert result.output == (b'0\n')*1000000
def test_opentelemetry_context_propagation():
trace_id = "80c190b5-9dc1-4eae-82b9-6c261438c817"
parent_span_id = 123