Merge pull request #34341 from vitlibar/add-test-for-propagating-opentelemetry-context-grpc

Add test for propagating OpenTelemetry context via gRPC protocol
This commit is contained in:
Vitaly Baranov 2022-02-07 12:29:05 +07:00 committed by GitHub
commit 7b91a66c39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 23 deletions

View File

@ -285,6 +285,15 @@ namespace
return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)}; return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)};
} }
std::optional<String> getClientHeader(const String & key) const
{
const auto & client_metadata = grpc_context.client_metadata();
auto it = client_metadata.find(key);
if (it != client_metadata.end())
return String{it->second.data(), it->second.size()};
return std::nullopt;
}
void setResultCompression(grpc_compression_algorithm algorithm, grpc_compression_level level) void setResultCompression(grpc_compression_algorithm algorithm, grpc_compression_level level)
{ {
grpc_context.set_compression_algorithm(algorithm); grpc_context.set_compression_algorithm(algorithm);
@ -296,8 +305,6 @@ namespace
setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level())); setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level()));
} }
grpc::ServerContext grpc_context;
protected: protected:
CompletionCallback * getCallbackPtr(const CompletionCallback & callback) CompletionCallback * getCallbackPtr(const CompletionCallback & callback)
{ {
@ -320,6 +327,8 @@ namespace
return &callback_in_map; return &callback_in_map;
} }
grpc::ServerContext grpc_context;
private: private:
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> reader_writer{&grpc_context}; grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> reader_writer{&grpc_context};
std::unordered_map<size_t, CompletionCallback> callbacks; std::unordered_map<size_t, CompletionCallback> callbacks;
@ -752,33 +761,21 @@ namespace
session->authenticate(user, password, user_address); session->authenticate(user, password, user_address);
session->getClientInfo().quota_key = quota_key; session->getClientInfo().quota_key = quota_key;
// Parse the OpenTelemetry traceparent header.
ClientInfo client_info = session->getClientInfo(); ClientInfo client_info = session->getClientInfo();
const auto & client_metadata = responder->grpc_context.client_metadata();
auto traceparent = client_metadata.find("traceparent"); /// Parse the OpenTelemetry traceparent header.
if (traceparent != client_metadata.end()) auto traceparent = responder->getClientHeader("traceparent");
if (traceparent)
{ {
grpc::string_ref parent_ref = traceparent->second; String error;
std::string opentelemetry_traceparent(parent_ref.data(), parent_ref.length()); if (!client_info.client_trace_context.parseTraceparentHeader(traceparent.value(), error))
std::string error;
if (!client_info.client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{ {
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
"Failed to parse OpenTelemetry traceparent header '{}': {}", "Failed to parse OpenTelemetry traceparent header '{}': {}",
opentelemetry_traceparent, error); traceparent.value(), error);
}
auto tracestate = client_metadata.find("tracestate");
if (tracestate != client_metadata.end())
{
grpc::string_ref state_ref = tracestate->second;
client_info.client_trace_context.tracestate =
std::string(state_ref.data(), state_ref.length());
}
else
{
client_info.client_trace_context.tracestate = "";
} }
auto tracestate = responder->getClientHeader("tracestate");
client_info.client_trace_context.tracestate = tracestate.value_or("");
} }
/// The user could specify session identifier and session timeout. /// The user could specify session identifier and session timeout.

View File

@ -431,3 +431,18 @@ def test_compressed_external_table():
b"3\tCarl\n"\ b"3\tCarl\n"\
b"4\tDaniel\n"\ b"4\tDaniel\n"\
b"5\tEthan\n" b"5\tEthan\n"
def test_opentelemetry_context_propagation():
trace_id = "80c190b5-9dc1-4eae-82b9-6c261438c817"
parent_span_id = 123
trace_state = "some custom state"
trace_id_hex = trace_id.replace("-", "")
parent_span_id_hex = f'{parent_span_id:0>16X}'
metadata = [("traceparent", f"00-{trace_id_hex}-{parent_span_id_hex}-01"), ("tracestate", trace_state)]
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
query_info = clickhouse_grpc_pb2.QueryInfo(query="SELECT 1")
result = stub.ExecuteQuery(query_info, metadata=metadata)
assert result.output == b"1\n"
node.query("SYSTEM FLUSH LOGS")
assert node.query(f"SELECT attribute['db.statement'], attribute['clickhouse.tracestate'] FROM system.opentelemetry_span_log "
f"WHERE trace_id='{trace_id}' AND parent_span_id={parent_span_id}") == "SELECT 1\tsome custom state\n"