gRPC: Allow change server-side compression from client.

This commit is contained in:
Vitaly Baranov 2021-09-13 00:43:04 +03:00
parent b40b4b0b4d
commit 113ddceabb
4 changed files with 80 additions and 3 deletions

View File

@ -193,11 +193,11 @@
<!-- The following file is used only if ssl_require_client_auth=1 -->
<ssl_ca_cert_file>/path/to/ssl_ca_cert_file</ssl_ca_cert_file>
<!-- Default compression algorithm (applied if client doesn't specify another algorithm).
<!-- Default compression algorithm (applied if client doesn't specify another algorithm, see result_compression in QueryInfo).
Supported algorithms: none, deflate, gzip, stream_gzip -->
<compression>deflate</compression>
<!-- Default compression level (applied if client doesn't specify another level).
<!-- Default compression level (applied if client doesn't specify another level, see result_compression in QueryInfo).
Supported levels: none, low, medium, high -->
<compression_level>medium</compression_level>

View File

@ -120,6 +120,33 @@ namespace
throw Exception("Unknown compression level: '" + str + "'", ErrorCodes::INVALID_CONFIG_PARAMETER);
}
grpc_compression_algorithm convertCompressionAlgorithm(const ::clickhouse::grpc::CompressionAlgorithm & algorithm)
{
if (algorithm == ::clickhouse::grpc::NO_COMPRESSION)
return GRPC_COMPRESS_NONE;
else if (algorithm == ::clickhouse::grpc::DEFLATE)
return GRPC_COMPRESS_DEFLATE;
else if (algorithm == ::clickhouse::grpc::GZIP)
return GRPC_COMPRESS_GZIP;
else if (algorithm == ::clickhouse::grpc::STREAM_GZIP)
return GRPC_COMPRESS_STREAM_GZIP;
else
throw Exception("Unknown compression algorithm: '" + ::clickhouse::grpc::CompressionAlgorithm_Name(algorithm) + "'", ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
grpc_compression_level convertCompressionLevel(const ::clickhouse::grpc::CompressionLevel & level)
{
if (level == ::clickhouse::grpc::COMPRESSION_NONE)
return GRPC_COMPRESS_LEVEL_NONE;
else if (level == ::clickhouse::grpc::COMPRESSION_LOW)
return GRPC_COMPRESS_LEVEL_LOW;
else if (level == ::clickhouse::grpc::COMPRESSION_MEDIUM)
return GRPC_COMPRESS_LEVEL_MED;
else if (level == ::clickhouse::grpc::COMPRESSION_HIGH)
return GRPC_COMPRESS_LEVEL_HIGH;
else
throw Exception("Unknown compression level: '" + ::clickhouse::grpc::CompressionLevel_Name(level) + "'", ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
/// Gets file's contents as a string, throws an exception if failed.
String readFile(const String & filepath)
@ -242,7 +269,22 @@ namespace
virtual void write(const GRPCResult & result, const CompletionCallback & callback) = 0;
virtual void writeAndFinish(const GRPCResult & result, const grpc::Status & status, const CompletionCallback & callback) = 0;
Poco::Net::SocketAddress getClientAddress() const { String peer = grpc_context.peer(); return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)}; }
Poco::Net::SocketAddress getClientAddress() const
{
String peer = grpc_context.peer();
return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)};
}
void setResultCompression(grpc_compression_algorithm algorithm, grpc_compression_level level)
{
grpc_context.set_compression_algorithm(algorithm);
grpc_context.set_compression_level(level);
}
void setResultCompression(const ::clickhouse::grpc::Compression & compression)
{
setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level()));
}
protected:
CompletionCallback * getCallbackPtr(const CompletionCallback & callback)
@ -732,6 +774,10 @@ namespace
if (!query_info.database().empty())
query_context->setCurrentDatabase(query_info.database());
/// Apply compression settings for this call.
if (query_info.has_result_compression())
responder->setResultCompression(query_info.result_compression());
/// The interactive delay will be used to show progress.
interactive_delay = settings.interactive_delay;
query_context->setProgressCallback([this](const Progress & value) { return progress.incrementPiecewiseAtomically(value); });

View File

@ -41,6 +41,25 @@ message ExternalTable {
map<string, string> settings = 5;
}
enum CompressionAlgorithm {
NO_COMPRESSION = 0;
DEFLATE = 1;
GZIP = 2;
STREAM_GZIP = 3;
}
enum CompressionLevel {
COMPRESSION_NONE = 0;
COMPRESSION_LOW = 1;
COMPRESSION_MEDIUM = 2;
COMPRESSION_HIGH = 3;
}
message Compression {
CompressionAlgorithm algorithm = 1;
CompressionLevel level = 2;
}
// Information about a query which a client sends to a ClickHouse server.
// The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data.
// In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set.
@ -78,6 +97,10 @@ message QueryInfo {
// If true there will be at least one more QueryInfo in the input stream.
// `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used.
bool next_query_info = 16;
/// Controls how a ClickHouse server will compress query execution results before sending back to the client.
/// If not set the compression settings from the configuration file will be used.
Compression result_compression = 17;
}
enum LogsLevel {

View File

@ -356,3 +356,11 @@ def test_cancel_while_generating_output():
for result in results:
output += result.output
assert output == b'0\t0\n1\t0\n2\t0\n3\t0\n'
def test_result_compression():
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 0 FROM numbers(1000000)",
result_compression=clickhouse_grpc_pb2.Compression(algorithm=clickhouse_grpc_pb2.CompressionAlgorithm.GZIP,
level=clickhouse_grpc_pb2.CompressionLevel.COMPRESSION_HIGH))
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQuery(query_info)
assert result.output == (b'0\n')*1000000