diff --git a/src/Common/OpenTelemetryTraceContext.h b/src/Common/OpenTelemetryTraceContext.h index 4d2fc656100..55b9d5bb49c 100644 --- a/src/Common/OpenTelemetryTraceContext.h +++ b/src/Common/OpenTelemetryTraceContext.h @@ -1,11 +1,44 @@ #pragma once -#include -#include +#include namespace DB { +struct OpenTelemetrySpan +{ + UUID trace_id; + UInt64 span_id; + UInt64 parent_span_id; + std::string operation_name; + UInt64 start_time_us; + UInt64 finish_time_us; + Map attributes; + + void addAttribute(const std::string& name, UInt64 value); + void addAttributeIfNotZero(const std::string& name, UInt64 value) + { + if (value != 0) + addAttribute(name, value); + } + + void addAttribute(const std::string& name, const std::string& value); + void addAttribute(const Exception & e); + void addAttribute(std::exception_ptr e); + + bool isTraceEnabled() const + { + return trace_id != UUID(); + } +}; + +struct OpenTelemetrySpanHolder; + +class Context; +using ContextPtr = std::shared_ptr; + +class OpenTelemetrySpanLog; + // The runtime info we need to create new OpenTelemetry spans. struct OpenTelemetryTraceContext { @@ -19,6 +52,46 @@ struct OpenTelemetryTraceContext // Parse/compose OpenTelemetry traceparent header. bool parseTraceparentHeader(const std::string & traceparent, std::string & error); std::string composeTraceparentHeader() const; + + bool isTraceEnabled() const + { + return trace_id != UUID(); + } }; +// tracing context kept on thread local +struct OpenTelemetryThreadTraceContext : OpenTelemetryTraceContext +{ + OpenTelemetryThreadTraceContext& operator =(const OpenTelemetryTraceContext& context) + { + *(static_cast(this)) = context; + return *this; + } + + void reset(); + + static const OpenTelemetryThreadTraceContext& current(); + + std::weak_ptr span_log; +}; + +struct OpenTelemetryThreadTraceContextScope +{ + // forbidden copy ctor and assignment to make the destructor safe + OpenTelemetryThreadTraceContextScope(const OpenTelemetryThreadTraceContextScope& scope) = delete; + OpenTelemetryThreadTraceContextScope& operator =(const OpenTelemetryThreadTraceContextScope& scope) = delete; + + OpenTelemetryThreadTraceContextScope(const std::string& _operation_name, + const OpenTelemetryTraceContext& _parent_trace_context, + const std::weak_ptr& _log); + + ~OpenTelemetryThreadTraceContextScope(); + + + + OpenTelemetrySpan root_span; +}; + +using OpenTelemetryThreadTraceContextScopePtr = std::unique_ptr; + } diff --git a/src/Common/OpenTelemtryTraceContext.cpp b/src/Common/OpenTelemtryTraceContext.cpp new file mode 100644 index 00000000000..162a741f97f --- /dev/null +++ b/src/Common/OpenTelemtryTraceContext.cpp @@ -0,0 +1,263 @@ +#include "Interpreters/OpenTelemetrySpanLog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ + +thread_local OpenTelemetryThreadTraceContext current_thread_trace_context; + +void OpenTelemetrySpan::addAttribute(const std::string& name, UInt64 value) +{ + if (trace_id == UUID() || name.empty()) + return; + + this->attributes.push_back(Tuple{name, toString(value)}); +} + +void OpenTelemetrySpan::addAttribute(const std::string& name, const std::string& value) +{ + if (trace_id == UUID() || name.empty() || value.empty()) + return; + + this->attributes.push_back(Tuple{name, value}); +} + +void OpenTelemetrySpan::addAttribute(const Exception & e) +{ + if (trace_id == UUID()) + return; + + this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)}); +} + +void OpenTelemetrySpan::addAttribute(std::exception_ptr e) +{ + if (trace_id == UUID() || e == nullptr) + return; + + this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)}); +} + +OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name) +{ + if (current_thread_trace_context.trace_id == UUID()) + { + this->trace_id = 0; + this->span_id = 0; + this->parent_span_id = 0; + } + else + { + this->trace_id = current_thread_trace_context.trace_id; + this->parent_span_id = current_thread_trace_context.span_id; + this->span_id = thread_local_rng(); // create a new id for this span + this->operation_name = _operation_name; + this->start_time_us = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + // set current span id to this + current_thread_trace_context.span_id = this->span_id; + } +} + +void OpenTelemetrySpanHolder::finish() +{ + if (trace_id == UUID()) + return; + + // First of all, return old value of current span. + assert(current_thread_trace_context.span_id == span_id); + current_thread_trace_context.span_id = parent_span_id; + + try + { + auto log = current_thread_trace_context.span_log.lock(); + if (!log) + { + // The log might be disabled. + return; + } + + this->finish_time_us = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + log->add(OpenTelemetrySpanLogElement(*this)); + } + catch (...) + { + tryLogCurrentException(__FUNCTION__); + } + + trace_id = UUID(); +} + +OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder() +{ + finish(); +} + + +template +static T readHex(const char * data) +{ + T x{}; + + const char * end = data + sizeof(T) * 2; + while (data < end) + { + x *= 16; + x += unhex(*data); + ++data; + } + + return x; +} + + +bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent, + std::string & error) +{ + trace_id = 0; + + // Version 00, which is the only one we can parse, is fixed width. Use this + // fact for an additional sanity check. + const int expected_length = strlen("xx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxx-xx"); + if (traceparent.length() != expected_length) + { + error = fmt::format("unexpected length {}, expected {}", + traceparent.length(), expected_length); + return false; + } + + const char * data = traceparent.data(); + + uint8_t version = unhex2(data); + data += 2; + + if (version != 0) + { + error = fmt::format("unexpected version {}, expected 00", version); + return false; + } + + if (*data != '-') + { + error = fmt::format("Malformed traceparant header: {}", traceparent); + return false; + } + + ++data; + UInt64 trace_id_higher_64 = unhexUInt(data); + UInt64 trace_id_lower_64 = unhexUInt(data + 16); + data += 32; + + if (*data != '-') + { + error = fmt::format("Malformed traceparant header: {}", traceparent); + return false; + } + + ++data; + UInt64 span_id_64 = unhexUInt(data); + data += 16; + + if (*data != '-') + { + error = fmt::format("Malformed traceparant header: {}", traceparent); + return false; + } + + ++data; + this->trace_flags = unhex2(data); + this->trace_id.toUnderType().items[0] = trace_id_higher_64; + this->trace_id.toUnderType().items[1] = trace_id_lower_64; + this->span_id = span_id_64; + return true; +} + + +std::string OpenTelemetryTraceContext::composeTraceparentHeader() const +{ + // This span is a parent for its children, so we specify this span_id as a + // parent id. + return fmt::format("00-{:016x}{:016x}-{:016x}-{:02x}", + trace_id.toUnderType().items[0], + trace_id.toUnderType().items[1], + span_id, + // This cast is needed because fmt is being weird and complaining that + // "mixing character types is not allowed". + static_cast(trace_flags)); +} + +const OpenTelemetryThreadTraceContext& OpenTelemetryThreadTraceContext::current() +{ + return current_thread_trace_context; +} + +void OpenTelemetryThreadTraceContext::reset() +{ + this->trace_id = 0; + this->span_id = 0; + this->trace_flags = 0; + this->tracestate = ""; + this->span_log.reset(); +} + +OpenTelemetryThreadTraceContextScope::OpenTelemetryThreadTraceContextScope(const std::string& _operation_name, + const OpenTelemetryTraceContext& _parent_trace_context, + const std::weak_ptr& _span_log) +{ + if (_parent_trace_context.isTraceEnabled()) + { + this->root_span.trace_id = _parent_trace_context.trace_id; + this->root_span.parent_span_id = _parent_trace_context.span_id; + this->root_span.span_id = thread_local_rng(); + this->root_span.operation_name = _operation_name; + this->root_span.start_time_us = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + } + else + { + this->root_span.trace_id = 0; + this->root_span.span_id = 0; + } + + // set trace context on the thread local + current_thread_trace_context = _parent_trace_context; + current_thread_trace_context.span_id = this->root_span.span_id; + current_thread_trace_context.span_log = _span_log; +} + +OpenTelemetryThreadTraceContextScope::~OpenTelemetryThreadTraceContextScope() +{ + if (this->root_span.trace_id != UUID()) + { + auto shared_span_log = current_thread_trace_context.span_log.lock(); + if (shared_span_log) + { + this->root_span.addAttribute("clickhouse.thread_id", getThreadId()); + this->root_span.finish_time_us = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + shared_span_log->add(OpenTelemetrySpanLogElement(this->root_span)); + } + + this->root_span.trace_id = 0; + } + + // restore thread local variables + current_thread_trace_context.reset(); +} + + +} + diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index a4f99c1be1a..077fbcb5bda 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index c80150a8fe8..788b1cc7fcb 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -135,12 +135,6 @@ public: using Deleter = std::function; Deleter deleter; - // This is the current most-derived OpenTelemetry span for this thread. It - // can be changed throughout the query execution, whenever we enter a new - // span or exit it. See OpenTelemetrySpanHolder that is normally responsible - // for these changes. - OpenTelemetryTraceContext thread_trace_context; - protected: ThreadGroupStatusPtr thread_group; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index ca6ff02b994..13d78a690b5 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -134,7 +134,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int INVALID_SETTING_VALUE; extern const int UNKNOWN_READ_METHOD; - extern const int NOT_IMPLEMENTED; } @@ -1328,29 +1327,6 @@ void Context::setCurrentQueryId(const String & query_id) random.words.a = thread_local_rng(); //-V656 random.words.b = thread_local_rng(); //-V656 - if (client_info.client_trace_context.trace_id != UUID()) - { - // Use the OpenTelemetry trace context we received from the client, and - // create a new span for the query. - query_trace_context = client_info.client_trace_context; - query_trace_context.span_id = thread_local_rng(); - } - else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) - { - // If this is an initial query without any parent OpenTelemetry trace, we - // might start the trace ourselves, with some configurable probability. - std::bernoulli_distribution should_start_trace{ - settings.opentelemetry_start_trace_probability}; - - if (should_start_trace(thread_local_rng)) - { - // Use the randomly generated default query id as the new trace id. - query_trace_context.trace_id = random.uuid; - query_trace_context.span_id = thread_local_rng(); - // Mark this trace as sampled in the flags. - query_trace_context.trace_flags = 1; - } - } String query_id_to_set = query_id; if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves. @@ -3449,4 +3425,52 @@ WriteSettings Context::getWriteSettings() const return res; } +OpenTelemetryThreadTraceContextScopePtr Context::startTracing(const std::string& name) +{ + OpenTelemetryThreadTraceContextScopePtr trace_context; + if (this->client_info.client_trace_context.trace_id != UUID()) + { + // Use the OpenTelemetry trace context we received from the client, and + // initialize the tracing context for this query on current thread + return std::make_unique(name, + this->client_info.client_trace_context, + this->getOpenTelemetrySpanLog()); + } + + // start the trace ourselves, with some configurable probability. + std::bernoulli_distribution should_start_trace{settings.opentelemetry_start_trace_probability}; + if (!should_start_trace(thread_local_rng)) + { + return trace_context; + } + + /// Generate random UUID, but using lower quality RNG, + /// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive. + /// NOTE: Actually we don't need to use UUIDs for query identifiers. + /// We could use any suitable string instead. + union + { + char bytes[16]; + struct + { + UInt64 a; + UInt64 b; + } words; + UUID uuid{}; + } random; + random.words.a = thread_local_rng(); //-V656 + random.words.b = thread_local_rng(); //-V656 + + OpenTelemetryTraceContext query_trace_context; + query_trace_context.trace_id = random.uuid; + query_trace_context.span_id = 0; + // Mark this trace as sampled in the flags. + query_trace_context.trace_flags = 1; + + return std::make_unique(name, + query_trace_context, + this->getOpenTelemetrySpanLog()); + +} + } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 83193dd589b..37b831ff18f 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -361,8 +361,7 @@ private: inline static ContextPtr global_context_instance; public: - // Top-level OpenTelemetry trace context for the query. Makes sense only for a query context. - OpenTelemetryTraceContext query_trace_context; + OpenTelemetryThreadTraceContextScopePtr startTracing(const std::string& name); private: using SampleBlockCache = std::unordered_map; diff --git a/src/Interpreters/OpenTelemetrySpanLog.h b/src/Interpreters/OpenTelemetrySpanLog.h index 34f4765c8c4..2f2cc7a1bea 100644 --- a/src/Interpreters/OpenTelemetrySpanLog.h +++ b/src/Interpreters/OpenTelemetrySpanLog.h @@ -1,29 +1,15 @@ #pragma once #include -#include -#include +#include namespace DB { -struct OpenTelemetrySpan -{ - UUID trace_id; - UInt64 span_id; - UInt64 parent_span_id; - std::string operation_name; - UInt64 start_time_us; - UInt64 finish_time_us; - Map attributes; - // I don't understand how Links work, namely, which direction should they - // point to, and how they are related with parent_span_id, so no Links for now. -}; - struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan { OpenTelemetrySpanLogElement() = default; - explicit OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span) + OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span) : OpenTelemetrySpan(span) {} static std::string name() { return "OpenTelemetrySpanLog"; } @@ -41,14 +27,18 @@ public: using SystemLog::SystemLog; }; +typedef std::shared_ptr OpenTelemetrySpanLogPtr; + struct OpenTelemetrySpanHolder : public OpenTelemetrySpan { + OpenTelemetrySpanHolder(const OpenTelemetryTraceContext& _trace_context, OpenTelemetrySpanLogPtr _span_log, const std::string & _operation_name); explicit OpenTelemetrySpanHolder(const std::string & _operation_name); void addAttribute(const std::string& name, UInt64 value); void addAttribute(const std::string& name, const std::string& value); void addAttribute(const Exception & e); void addAttribute(std::exception_ptr e); + void finish(); ~OpenTelemetrySpanHolder(); }; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 53d7fd0457a..d2371a6b192 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -84,15 +84,6 @@ void ThreadStatus::attachQueryContext(ContextPtr query_context_) thread_group->global_context = global_context; } - // Generate new span for thread manually here, because we can't depend - // on OpenTelemetrySpanHolder due to link order issues. - // FIXME why and how is this different from setupState()? - thread_trace_context = query_context_->query_trace_context; - if (thread_trace_context.trace_id != UUID()) - { - thread_trace_context.span_id = thread_local_rng(); - } - applyQuerySettings(); } @@ -132,18 +123,6 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_) if (auto query_context_ptr = query_context.lock()) { applyQuerySettings(); - - // Generate new span for thread manually here, because we can't depend - // on OpenTelemetrySpanHolder due to link order issues. - thread_trace_context = query_context_ptr->query_trace_context; - if (thread_trace_context.trace_id != UUID()) - { - thread_trace_context.span_id = thread_local_rng(); - } - } - else - { - thread_trace_context.trace_id = 0; } initPerformanceCounters(); @@ -353,42 +332,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__); - std::shared_ptr opentelemetry_span_log; - auto query_context_ptr = query_context.lock(); - if (thread_trace_context.trace_id != UUID() && query_context_ptr) - { - opentelemetry_span_log = query_context_ptr->getOpenTelemetrySpanLog(); - } - - if (opentelemetry_span_log) - { - // Log the current thread span. - // We do this manually, because we can't use OpenTelemetrySpanHolder as a - // ThreadStatus member, because of linking issues. This file is linked - // separately, so we can reference OpenTelemetrySpanLog here, but if we had - // the span holder as a field, we would have to reference it in the - // destructor, which is in another library. - OpenTelemetrySpanLogElement span; - - span.trace_id = thread_trace_context.trace_id; - // All child span holders should be finished by the time we detach this - // thread, so the current span id should be the thread span id. If not, - // an assertion for a proper parent span in ~OpenTelemetrySpanHolder() - // is going to fail, because we're going to reset it to zero later in - // this function. - span.span_id = thread_trace_context.span_id; - assert(query_context_ptr); - span.parent_span_id = query_context_ptr->query_trace_context.span_id; - span.operation_name = getThreadName(); - span.start_time_us = query_start_time_microseconds; - span.finish_time_us = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - span.attributes.push_back(Tuple{"clickhouse.thread_id", toString(thread_id)}); - - opentelemetry_span_log->add(span); - } - finalizeQueryProfiler(); finalizePerformanceCounters(); @@ -404,8 +347,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) query_id.clear(); query_context.reset(); - thread_trace_context.trace_id = 0; - thread_trace_context.span_id = 0; thread_group.reset(); thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;