diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 5348a9e36c5..e4858eeda8b 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -2515,7 +2515,7 @@ public: { std::string traceparent = options["opentelemetry-traceparent"].as(); std::string error; - if (!context.getClientInfo().parseTraceparentHeader( + if (!context.getClientInfo().client_trace_context.parseTraceparentHeader( traceparent, error)) { throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -2526,7 +2526,7 @@ public: if (options.count("opentelemetry-tracestate")) { - context.getClientInfo().opentelemetry_tracestate = + context.getClientInfo().client_trace_context.tracestate = options["opentelemetry-tracestate"].as(); } diff --git a/src/Common/OpenTelemetryTraceContext.h b/src/Common/OpenTelemetryTraceContext.h new file mode 100644 index 00000000000..1024230703d --- /dev/null +++ b/src/Common/OpenTelemetryTraceContext.h @@ -0,0 +1,20 @@ +#pragma once + +namespace DB { + +// The runtime info we need to create new OpenTelemetry spans. +struct OpenTelemetryTraceContext +{ + __uint128_t trace_id = 0; + UInt64 span_id = 0; + // The incoming tracestate header and the trace flags, we just pass them + // downstream. See https://www.w3.org/TR/trace-context/ + String tracestate; + __uint8_t trace_flags = 0; + + // Parse/compose OpenTelemetry traceparent header. + bool parseTraceparentHeader(const std::string & traceparent, std::string & error); + std::string composeTraceparentHeader() const; +}; + +} diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 0162a6946c6..4f6422ab151 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -108,8 +109,11 @@ public: using Deleter = std::function; Deleter deleter; - __uint128_t opentelemetry_trace_id; - UInt64 opentelemetry_current_span_id; + // This is the current most-derived OpenTelemetry span for this thread. It + // can be changed throughout the query execution, whenever we enter a new + // span or exit it. See OpenTelemetrySpanHolder that is normally responsible + // for these changes. + OpenTelemetryTraceContext thread_trace_context; protected: ThreadGroupStatusPtr thread_group; diff --git a/src/DataStreams/RemoteBlockOutputStream.cpp b/src/DataStreams/RemoteBlockOutputStream.cpp index 327e0204892..1611ada732c 100644 --- a/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/src/DataStreams/RemoteBlockOutputStream.cpp @@ -27,6 +27,10 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, { ClientInfo modified_client_info = client_info_; modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + if (CurrentThread::isInitialized()) + { + modified_client_info.opentelemetry = CurrentThread::get().opentelemetry; + } /** Send query and receive "header", that describes table structure. * Header is needed to know, what structure is required for blocks to be passed to 'write' method. diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 38486aa6368..c6ad88a99eb 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -156,6 +156,10 @@ void RemoteQueryExecutor::sendQuery() auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context.getClientInfo(); modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + if (CurrentThread::isInitialized()) + { + modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context; + } multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); diff --git a/src/Interpreters/ClientInfo.cpp b/src/Interpreters/ClientInfo.cpp index 37eb403ddab..f021d989158 100644 --- a/src/Interpreters/ClientInfo.cpp +++ b/src/Interpreters/ClientInfo.cpp @@ -62,16 +62,16 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY) { - if (opentelemetry_trace_id) + if (client_trace_context.trace_id) { // Have OpenTelemetry header. writeBinary(uint8_t(1), out); // No point writing these numbers with variable length, because they // are random and will probably require the full length anyway. - writeBinary(opentelemetry_trace_id, out); - writeBinary(opentelemetry_span_id, out); - writeBinary(opentelemetry_tracestate, out); - writeBinary(opentelemetry_trace_flags, out); + writeBinary(client_trace_context.trace_id, out); + writeBinary(client_trace_context.span_id, out); + writeBinary(client_trace_context.tracestate, out); + writeBinary(client_trace_context.trace_flags, out); } else { @@ -139,10 +139,10 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) readBinary(have_trace_id, in); if (have_trace_id) { - readBinary(opentelemetry_trace_id, in); - readBinary(opentelemetry_span_id, in); - readBinary(opentelemetry_tracestate, in); - readBinary(opentelemetry_trace_flags, in); + readBinary(client_trace_context.trace_id, in); + readBinary(client_trace_context.span_id, in); + readBinary(client_trace_context.tracestate, in); + readBinary(client_trace_context.trace_flags, in); } } } @@ -155,14 +155,14 @@ void ClientInfo::setInitialQuery() client_name = (DBMS_NAME " ") + client_name; } -bool ClientInfo::parseTraceparentHeader(const std::string & traceparent, +bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent, std::string & error) { + trace_id = 0; + uint8_t version = -1; uint64_t trace_id_high = 0; uint64_t trace_id_low = 0; - uint64_t trace_parent = 0; - uint8_t trace_flags = 0; // Version 00, which is the only one we can parse, is fixed width. Use this // fact for an additional sanity check. @@ -183,7 +183,7 @@ bool ClientInfo::parseTraceparentHeader(const std::string & traceparent, // NOLINTNEXTLINE(cert-err34-c) int result = sscanf(&traceparent[0], "%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8, - &version, &trace_id_high, &trace_id_low, &trace_parent, &trace_flags); + &version, &trace_id_high, &trace_id_low, &span_id, &trace_flags); if (result == EOF) { @@ -205,23 +205,21 @@ bool ClientInfo::parseTraceparentHeader(const std::string & traceparent, return false; } - opentelemetry_trace_id = static_cast<__uint128_t>(trace_id_high) << 64 + trace_id = static_cast<__uint128_t>(trace_id_high) << 64 | trace_id_low; - opentelemetry_span_id = trace_parent; - opentelemetry_trace_flags = trace_flags; return true; } -std::string ClientInfo::composeTraceparentHeader() const +std::string OpenTelemetryTraceContext::composeTraceparentHeader() const { // This span is a parent for its children, so we specify this span_id as a // parent id. - return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id, - opentelemetry_span_id, + return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id, + span_id, // This cast is needed because fmt is being weird and complaining that // "mixing character types is not allowed". - static_cast(opentelemetry_trace_flags)); + static_cast(trace_flags)); } void ClientInfo::fillOSUserHostNameAndVersionInfo() diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 2edf47684d3..c280ee42224 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -3,7 +3,7 @@ #include #include #include - +#include namespace DB { @@ -59,16 +59,9 @@ public: String initial_query_id; Poco::Net::SocketAddress initial_address; - // OpenTelemetry trace information. - __uint128_t opentelemetry_trace_id = 0; - // The span id we get the in the incoming client info becomes our parent span - // id, and the span id we send becomes downstream parent span id. - UInt64 opentelemetry_span_id = 0; - UInt64 opentelemetry_parent_span_id = 0; - // The incoming tracestate header and the trace flags, we just pass them downstream. - // They are described at https://www.w3.org/TR/trace-context/ - String opentelemetry_tracestate; - UInt8 opentelemetry_trace_flags = 0; + // OpenTelemetry trace context we received from client, or which we are going + // to send to server. + OpenTelemetryTraceContext client_trace_context; /// All below are parameters related to initial query. @@ -102,16 +95,6 @@ public: /// Initialize parameters on client initiating query. void setInitialQuery(); - // Parse/compose OpenTelemetry traceparent header. - // Note that these functions use span_id field, not parent_span_id, same as - // in native protocol. The incoming traceparent corresponds to the upstream - // trace span, and the outgoing traceparent corresponds to our current span. - // We use the same ClientInfo structure first for incoming span, and then - // for our span: when we switch, we use old span_id as parent_span_id, and - // generate a new span_id (currently this happens in Context::setQueryId()). - bool parseTraceparentHeader(const std::string & traceparent, std::string & error); - std::string composeTraceparentHeader() const; - private: void fillOSUserHostNameAndVersionInfo(); }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 54ee7713e95..ad550657e54 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1127,8 +1127,14 @@ void Context::setCurrentQueryId(const String & query_id) random.words.a = thread_local_rng(); //-V656 random.words.b = thread_local_rng(); //-V656 - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY - && client_info.opentelemetry_trace_id == 0) + if (client_info.client_trace_context.trace_id != 0) + { + // Use the OpenTelemetry trace context we received from the client, and + // create a new span for the query. + query_trace_context = client_info.client_trace_context; + query_trace_context.span_id = thread_local_rng(); + } + else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) { // If this is an initial query without any parent OpenTelemetry trace, we // might start the trace ourselves, with some configurable probability. @@ -1138,20 +1144,12 @@ void Context::setCurrentQueryId(const String & query_id) if (should_start_trace(thread_local_rng)) { // Use the randomly generated default query id as the new trace id. - client_info.opentelemetry_trace_id = random.uuid; - client_info.opentelemetry_parent_span_id = 0; - client_info.opentelemetry_span_id = thread_local_rng(); + query_trace_context.trace_id = random.uuid; + query_trace_context.span_id = thread_local_rng(); // Mark this trace as sampled in the flags. - client_info.opentelemetry_trace_flags = 1; + query_trace_context.trace_flags = 1; } } - else - { - // The incoming request has an OpenTelemtry trace context. Its span id - // becomes our parent span id. - client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id; - client_info.opentelemetry_span_id = thread_local_rng(); - } String query_id_to_set = query_id; if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves. diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 02a57b5d966..66b99581bf7 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -198,6 +199,12 @@ private: Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this. Context * global_context = nullptr; /// Global context. Could be equal to this. +public: + // Top-level OpenTelemetry trace context for the query. Makes sense only for + // a query context. + OpenTelemetryTraceContext query_trace_context; + +private: friend class NamedSessions; using SampleBlockCache = std::unordered_map; diff --git a/src/Interpreters/OpenTelemetrySpanLog.cpp b/src/Interpreters/OpenTelemetrySpanLog.cpp index b28047e05c8..853428cf7b9 100644 --- a/src/Interpreters/OpenTelemetrySpanLog.cpp +++ b/src/Interpreters/OpenTelemetrySpanLog.cpp @@ -54,15 +54,25 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name) { + trace_id = 0; + + if (!CurrentThread::isInitialized()) + { + // There may be no thread context if we're running inside the + // clickhouse-client, e.g. reading an external table provided with the + // `--external` option. + return; + } + auto & thread = CurrentThread::get(); - trace_id = thread.opentelemetry_trace_id; + trace_id = thread.thread_trace_context.trace_id; if (!trace_id) { return; } - parent_span_id = thread.opentelemetry_current_span_id; + parent_span_id = thread.thread_trace_context.span_id; span_id = thread_local_rng(); operation_name = _operation_name; start_time_us = std::chrono::duration_cast( @@ -72,7 +82,7 @@ OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_ attribute_names.push_back("clickhouse.start.stacktrace"); attribute_values.push_back(StackTrace().toString()); - thread.opentelemetry_current_span_id = span_id; + thread.thread_trace_context.span_id = span_id; } OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder() @@ -86,8 +96,8 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder() // First of all, return old value of current span. auto & thread = CurrentThread::get(); - assert(thread.opentelemetry_current_span_id = span_id); - thread.opentelemetry_current_span_id = parent_span_id; + assert(thread.thread_trace_context.span_id = span_id); + thread.thread_trace_context.span_id = parent_span_id; // Not sure what's the best way to access the log from here. auto * thread_group = CurrentThread::getGroup().get(); diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 3d56182a2f7..c9742615e03 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -74,6 +74,15 @@ void ThreadStatus::attachQueryContext(Context & query_context_) thread_group->global_context = global_context; } + // Generate new span for thread manually here, because we can't depend + // on OpenTelemetrySpanHolder due to link order issues. + // FIXME why and how is this different from setupState()? + thread_trace_context = query_context->query_trace_context; + if (thread_trace_context.trace_id) + { + thread_trace_context.span_id = thread_local_rng(); + } + applyQuerySettings(); } @@ -112,20 +121,17 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_) { applyQuerySettings(); - opentelemetry_trace_id = query_context->getClientInfo().opentelemetry_trace_id; - if (opentelemetry_trace_id) + // Generate new span for thread manually here, because we can't depend + // on OpenTelemetrySpanHolder due to link order issues. + thread_trace_context = query_context->query_trace_context; + if (thread_trace_context.trace_id) { - opentelemetry_current_span_id = thread_local_rng(); - } - else - { - opentelemetry_current_span_id = 0; + thread_trace_context.span_id = thread_local_rng(); } } else { - opentelemetry_trace_id = 0; - opentelemetry_current_span_id = 0; + thread_trace_context.trace_id = 0; } initPerformanceCounters(); @@ -319,7 +325,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__); std::shared_ptr opentelemetry_span_log; - if (opentelemetry_trace_id && query_context) + if (thread_trace_context.trace_id && query_context) { opentelemetry_span_log = query_context->getOpenTelemetrySpanLog(); } @@ -334,11 +340,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) // destructor, which is in another library. OpenTelemetrySpanLogElement span; - span.trace_id = opentelemetry_trace_id; + span.trace_id = thread_trace_context.trace_id; // Might be problematic if some span holder isn't finished by the time // we detach this thread... - span.span_id = opentelemetry_current_span_id; - span.parent_span_id = query_context->getClientInfo().opentelemetry_span_id; + span.span_id = thread_trace_context.span_id; + span.parent_span_id = query_context->query_trace_context.span_id; span.operation_name = getThreadName(); span.start_time_us = query_start_time_microseconds; span.finish_time_us = @@ -364,8 +370,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) query_id.clear(); query_context = nullptr; - opentelemetry_trace_id = 0; - opentelemetry_current_span_id = 0; + thread_trace_context.trace_id = 0; + thread_trace_context.span_id = 0; thread_group.reset(); thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 4c31d22529a..b0ffea59037 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -153,13 +153,11 @@ static void logQuery(const String & query, const Context & context, bool interna (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()), joinLines(query)); - if (client_info.opentelemetry_trace_id) + if (client_info.client_trace_context.trace_id) { LOG_TRACE(&Poco::Logger::get("executeQuery"), - "OpenTelemetry trace id {:x}, span id {}, parent span id {}", - client_info.opentelemetry_trace_id, - client_info.opentelemetry_span_id, - client_info.opentelemetry_parent_span_id); + "OpenTelemetry traceparent '{}'", + client_info.client_trace_context.composeTraceparentHeader()); } } } @@ -247,13 +245,13 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c query_log->add(elem); if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog(); - context.getClientInfo().opentelemetry_trace_id + context.query_trace_context.trace_id && opentelemetry_span_log) { OpenTelemetrySpanLogElement span; - span.trace_id = context.getClientInfo().opentelemetry_trace_id; - span.span_id = context.getClientInfo().opentelemetry_span_id; - span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id; + span.trace_id = context.query_trace_context.trace_id; + span.span_id = context.query_trace_context.span_id; + span.parent_span_id = context.getClientInfo().client_trace_context.span_id; span.operation_name = "query"; span.start_time_us = current_time_us; span.finish_time_us = current_time_us; @@ -269,11 +267,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c span.attribute_names.push_back("clickhouse.query_id"); span.attribute_values.push_back(elem.client_info.current_query_id); - if (!context.getClientInfo().opentelemetry_tracestate.empty()) + if (!context.query_trace_context.tracestate.empty()) { span.attribute_names.push_back("clickhouse.tracestate"); span.attribute_values.push_back( - context.getClientInfo().opentelemetry_tracestate); + context.query_trace_context.tracestate); } opentelemetry_span_log->add(span); @@ -689,13 +687,13 @@ static std::tuple executeQueryImpl( } if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog(); - context.getClientInfo().opentelemetry_trace_id + context.query_trace_context.trace_id && opentelemetry_span_log) { OpenTelemetrySpanLogElement span; - span.trace_id = context.getClientInfo().opentelemetry_trace_id; - span.span_id = context.getClientInfo().opentelemetry_span_id; - span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id; + span.trace_id = context.query_trace_context.trace_id; + span.span_id = context.query_trace_context.span_id; + span.parent_span_id = context.getClientInfo().client_trace_context.span_id; span.operation_name = "query"; span.start_time_us = elem.query_start_time_microseconds; span.finish_time_us = time_in_microseconds(finish_time); @@ -710,11 +708,11 @@ static std::tuple executeQueryImpl( span.attribute_names.push_back("clickhouse.query_id"); span.attribute_values.push_back(elem.client_info.current_query_id); - if (!context.getClientInfo().opentelemetry_tracestate.empty()) + if (!context.query_trace_context.tracestate.empty()) { span.attribute_names.push_back("clickhouse.tracestate"); span.attribute_values.push_back( - context.getClientInfo().opentelemetry_tracestate); + context.query_trace_context.tracestate); } opentelemetry_span_log->add(span); diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index 94d66d44af0..e7cdcd62bfb 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -318,7 +318,7 @@ void HTTPHandler::processQuery( { std::string opentelemetry_traceparent = request.get("traceparent"); std::string error; - if (!context.getClientInfo().parseTraceparentHeader( + if (!context.getClientInfo().client_trace_context.parseTraceparentHeader( opentelemetry_traceparent, error)) { throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, @@ -326,7 +326,7 @@ void HTTPHandler::processQuery( opentelemetry_traceparent, error); } - context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", ""); + context.getClientInfo().client_trace_context.tracestate = request.get("tracestate", ""); } #endif diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index ceef755234f..ea41ee37203 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -74,16 +74,19 @@ namespace ReadWriteBufferFromHTTP::HTTPHeaderEntries header; // Propagate OpenTelemetry trace context, if any, downstream. - const auto & client_info = context.getClientInfo(); - if (client_info.opentelemetry_trace_id) + if (CurrentThread::isInitialized()) { - header.emplace_back("traceparent", - client_info.composeTraceparentHeader()); - - if (!client_info.opentelemetry_tracestate.empty()) + const auto & thread_trace_context = CurrentThread::get().thread_trace_context; + if (opentelemetry.trace_id) { - header.emplace_back("tracestate", - client_info.opentelemetry_tracestate); + header.emplace_back("traceparent", + thread_trace_context.composeTraceparentHeader()); + + if (!thread_trace_context.tracestate.empty()) + { + header.emplace_back("tracestate", + thread_trace_context.tracestate); + } } } diff --git a/tests/queries/0_stateless/01455_opentelemetry_distributed.reference b/tests/queries/0_stateless/01455_opentelemetry_distributed.reference index 420bb17ae8b..b40e4f87c13 100644 --- a/tests/queries/0_stateless/01455_opentelemetry_distributed.reference +++ b/tests/queries/0_stateless/01455_opentelemetry_distributed.reference @@ -1,5 +1,5 @@ ===http=== -{"total spans":"4","unique spans":"4","unique non-zero parent spans":"2"} +{"total spans":"4","unique spans":"4","unique non-zero parent spans":"3"} {"initial query spans with proper parent":"1"} {"unique non-empty tracestate values":"1"} ===native===