From f5cbc6da433384493787fcfb4351d045df28fd08 Mon Sep 17 00:00:00 2001 From: Andre Marianiello Date: Thu, 27 Jan 2022 11:43:19 -0500 Subject: [PATCH] Propagate trace context from GRPC calls --- src/Server/GRPCServer.cpp | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 589bdd63f41..85b1d345c6e 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -64,6 +64,7 @@ namespace ErrorCodes extern const int NETWORK_ERROR; extern const int NO_DATA_TO_INSERT; extern const int SUPPORT_IS_DISABLED; + extern const int BAD_REQUEST_PARAMETER; } namespace @@ -295,6 +296,8 @@ namespace setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level())); } + grpc::ServerContext grpc_context; + protected: CompletionCallback * getCallbackPtr(const CompletionCallback & callback) { @@ -317,8 +320,6 @@ namespace return &callback_in_map; } - grpc::ServerContext grpc_context; - private: grpc::ServerAsyncReaderWriter reader_writer{&grpc_context}; std::unordered_map callbacks; @@ -751,6 +752,35 @@ namespace session->authenticate(user, password, user_address); session->getClientInfo().quota_key = quota_key; + // Parse the OpenTelemetry traceparent header. + ClientInfo client_info = session->getClientInfo(); + auto & client_metadata = responder->grpc_context.client_metadata(); + auto traceparent = client_metadata.find("traceparent"); + if (traceparent != client_metadata.end()) + { + grpc::string_ref parent_ref = traceparent->second; + std::string opentelemetry_traceparent(parent_ref.data(), parent_ref.length()); + std::string error; + if (!client_info.client_trace_context.parseTraceparentHeader( + opentelemetry_traceparent, error)) + { + throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, + "Failed to parse OpenTelemetry traceparent header '{}': {}", + opentelemetry_traceparent, error); + } + auto tracestate = client_metadata.find("tracestate"); + if (tracestate != client_metadata.end()) + { + grpc::string_ref state_ref = tracestate->second; + client_info.client_trace_context.tracestate = + std::string(state_ref.data(), state_ref.length()); + } + else + { + client_info.client_trace_context.tracestate = ""; + } + } + /// The user could specify session identifier and session timeout. /// It allows to modify settings, create temporary tables and reuse them in subsequent requests. if (!query_info.session_id().empty()) @@ -759,7 +789,7 @@ namespace query_info.session_id(), getSessionTimeout(query_info, iserver.config()), query_info.session_check()); } - query_context = session->makeQueryContext(); + query_context = session->makeQueryContext(std::move(client_info)); /// Prepare settings. SettingsChanges settings_changes;