Propagate trace context from GRPC calls

This commit is contained in:
Andre Marianiello 2022-01-27 11:43:19 -05:00
parent 8c9266b24c
commit f5cbc6da43

View File

@ -64,6 +64,7 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int NO_DATA_TO_INSERT;
extern const int SUPPORT_IS_DISABLED;
extern const int BAD_REQUEST_PARAMETER;
}
namespace
@ -295,6 +296,8 @@ namespace
setResultCompression(convertCompressionAlgorithm(compression.algorithm()), convertCompressionLevel(compression.level()));
}
grpc::ServerContext grpc_context;
protected:
CompletionCallback * getCallbackPtr(const CompletionCallback & callback)
{
@ -317,8 +320,6 @@ namespace
return &callback_in_map;
}
grpc::ServerContext grpc_context;
private:
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> reader_writer{&grpc_context};
std::unordered_map<size_t, CompletionCallback> callbacks;
@ -751,6 +752,35 @@ namespace
session->authenticate(user, password, user_address);
session->getClientInfo().quota_key = quota_key;
// Parse the OpenTelemetry traceparent header.
ClientInfo client_info = session->getClientInfo();
auto & client_metadata = responder->grpc_context.client_metadata();
auto traceparent = client_metadata.find("traceparent");
if (traceparent != client_metadata.end())
{
grpc::string_ref parent_ref = traceparent->second;
std::string opentelemetry_traceparent(parent_ref.data(), parent_ref.length());
std::string error;
if (!client_info.client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
"Failed to parse OpenTelemetry traceparent header '{}': {}",
opentelemetry_traceparent, error);
}
auto tracestate = client_metadata.find("tracestate");
if (tracestate != client_metadata.end())
{
grpc::string_ref state_ref = tracestate->second;
client_info.client_trace_context.tracestate =
std::string(state_ref.data(), state_ref.length());
}
else
{
client_info.client_trace_context.tracestate = "";
}
}
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
if (!query_info.session_id().empty())
@ -759,7 +789,7 @@ namespace
query_info.session_id(), getSessionTimeout(query_info, iserver.config()), query_info.session_check());
}
query_context = session->makeQueryContext();
query_context = session->makeQueryContext(std::move(client_info));
/// Prepare settings.
SettingsChanges settings_changes;