Move thread trace context out of ThreadStatus

This commit is contained in:
Frank Chen 2022-07-07 17:41:10 +08:00
parent 01bbfd86ad
commit 2e8c530bed
8 changed files with 393 additions and 110 deletions

View File

@ -1,11 +1,44 @@
#pragma once
#include <base/types.h>
#include <base/UUID.h>
#include <Core/Field.h>
namespace DB
{
struct OpenTelemetrySpan
{
UUID trace_id;
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
UInt64 start_time_us;
UInt64 finish_time_us;
Map attributes;
void addAttribute(const std::string& name, UInt64 value);
void addAttributeIfNotZero(const std::string& name, UInt64 value)
{
if (value != 0)
addAttribute(name, value);
}
void addAttribute(const std::string& name, const std::string& value);
void addAttribute(const Exception & e);
void addAttribute(std::exception_ptr e);
bool isTraceEnabled() const
{
return trace_id != UUID();
}
};
struct OpenTelemetrySpanHolder;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
class OpenTelemetrySpanLog;
// The runtime info we need to create new OpenTelemetry spans.
struct OpenTelemetryTraceContext
{
@ -19,6 +52,46 @@ struct OpenTelemetryTraceContext
// Parse/compose OpenTelemetry traceparent header.
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
bool isTraceEnabled() const
{
return trace_id != UUID();
}
};
// tracing context kept on thread local
struct OpenTelemetryThreadTraceContext : OpenTelemetryTraceContext
{
OpenTelemetryThreadTraceContext& operator =(const OpenTelemetryTraceContext& context)
{
*(static_cast<OpenTelemetryTraceContext*>(this)) = context;
return *this;
}
void reset();
static const OpenTelemetryThreadTraceContext& current();
std::weak_ptr<OpenTelemetrySpanLog> span_log;
};
struct OpenTelemetryThreadTraceContextScope
{
// forbidden copy ctor and assignment to make the destructor safe
OpenTelemetryThreadTraceContextScope(const OpenTelemetryThreadTraceContextScope& scope) = delete;
OpenTelemetryThreadTraceContextScope& operator =(const OpenTelemetryThreadTraceContextScope& scope) = delete;
OpenTelemetryThreadTraceContextScope(const std::string& _operation_name,
const OpenTelemetryTraceContext& _parent_trace_context,
const std::weak_ptr<OpenTelemetrySpanLog>& _log);
~OpenTelemetryThreadTraceContextScope();
OpenTelemetrySpan root_span;
};
using OpenTelemetryThreadTraceContextScopePtr = std::unique_ptr<OpenTelemetryThreadTraceContextScope>;
}

View File

@ -0,0 +1,263 @@
#include "Interpreters/OpenTelemetrySpanLog.h"
#include <random>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <Common/hex.h>
#include <Common/Exception.h>
#include <common/getThreadId.h>
namespace DB
{
thread_local OpenTelemetryThreadTraceContext current_thread_trace_context;
void OpenTelemetrySpan::addAttribute(const std::string& name, UInt64 value)
{
if (trace_id == UUID() || name.empty())
return;
this->attributes.push_back(Tuple{name, toString(value)});
}
void OpenTelemetrySpan::addAttribute(const std::string& name, const std::string& value)
{
if (trace_id == UUID() || name.empty() || value.empty())
return;
this->attributes.push_back(Tuple{name, value});
}
void OpenTelemetrySpan::addAttribute(const Exception & e)
{
if (trace_id == UUID())
return;
this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)});
}
void OpenTelemetrySpan::addAttribute(std::exception_ptr e)
{
if (trace_id == UUID() || e == nullptr)
return;
this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)});
}
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
{
if (current_thread_trace_context.trace_id == UUID())
{
this->trace_id = 0;
this->span_id = 0;
this->parent_span_id = 0;
}
else
{
this->trace_id = current_thread_trace_context.trace_id;
this->parent_span_id = current_thread_trace_context.span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// set current span id to this
current_thread_trace_context.span_id = this->span_id;
}
}
void OpenTelemetrySpanHolder::finish()
{
if (trace_id == UUID())
return;
// First of all, return old value of current span.
assert(current_thread_trace_context.span_id == span_id);
current_thread_trace_context.span_id = parent_span_id;
try
{
auto log = current_thread_trace_context.span_log.lock();
if (!log)
{
// The log might be disabled.
return;
}
this->finish_time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(*this));
}
catch (...)
{
tryLogCurrentException(__FUNCTION__);
}
trace_id = UUID();
}
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
{
finish();
}
template <typename T>
static T readHex(const char * data)
{
T x{};
const char * end = data + sizeof(T) * 2;
while (data < end)
{
x *= 16;
x += unhex(*data);
++data;
}
return x;
}
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = strlen("xx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxx-xx");
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
const char * data = traceparent.data();
uint8_t version = unhex2(data);
data += 2;
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
if (*data != '-')
{
error = fmt::format("Malformed traceparant header: {}", traceparent);
return false;
}
++data;
UInt64 trace_id_higher_64 = unhexUInt<UInt64>(data);
UInt64 trace_id_lower_64 = unhexUInt<UInt64>(data + 16);
data += 32;
if (*data != '-')
{
error = fmt::format("Malformed traceparant header: {}", traceparent);
return false;
}
++data;
UInt64 span_id_64 = unhexUInt<UInt64>(data);
data += 16;
if (*data != '-')
{
error = fmt::format("Malformed traceparant header: {}", traceparent);
return false;
}
++data;
this->trace_flags = unhex2(data);
this->trace_id.toUnderType().items[0] = trace_id_higher_64;
this->trace_id.toUnderType().items[1] = trace_id_lower_64;
this->span_id = span_id_64;
return true;
}
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:016x}{:016x}-{:016x}-{:02x}",
trace_id.toUnderType().items[0],
trace_id.toUnderType().items[1],
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(trace_flags));
}
const OpenTelemetryThreadTraceContext& OpenTelemetryThreadTraceContext::current()
{
return current_thread_trace_context;
}
void OpenTelemetryThreadTraceContext::reset()
{
this->trace_id = 0;
this->span_id = 0;
this->trace_flags = 0;
this->tracestate = "";
this->span_log.reset();
}
OpenTelemetryThreadTraceContextScope::OpenTelemetryThreadTraceContextScope(const std::string& _operation_name,
const OpenTelemetryTraceContext& _parent_trace_context,
const std::weak_ptr<OpenTelemetrySpanLog>& _span_log)
{
if (_parent_trace_context.isTraceEnabled())
{
this->root_span.trace_id = _parent_trace_context.trace_id;
this->root_span.parent_span_id = _parent_trace_context.span_id;
this->root_span.span_id = thread_local_rng();
this->root_span.operation_name = _operation_name;
this->root_span.start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
else
{
this->root_span.trace_id = 0;
this->root_span.span_id = 0;
}
// set trace context on the thread local
current_thread_trace_context = _parent_trace_context;
current_thread_trace_context.span_id = this->root_span.span_id;
current_thread_trace_context.span_log = _span_log;
}
OpenTelemetryThreadTraceContextScope::~OpenTelemetryThreadTraceContextScope()
{
if (this->root_span.trace_id != UUID())
{
auto shared_span_log = current_thread_trace_context.span_log.lock();
if (shared_span_log)
{
this->root_span.addAttribute("clickhouse.thread_id", getThreadId());
this->root_span.finish_time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
shared_span_log->add(OpenTelemetrySpanLogElement(this->root_span));
}
this->root_span.trace_id = 0;
}
// restore thread local variables
current_thread_trace_context.reset();
}
}

View File

@ -3,7 +3,6 @@
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <base/errnoToString.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>

View File

@ -135,12 +135,6 @@ public:
using Deleter = std::function<void()>;
Deleter deleter;
// This is the current most-derived OpenTelemetry span for this thread. It
// can be changed throughout the query execution, whenever we enter a new
// span or exit it. See OpenTelemetrySpanHolder that is normally responsible
// for these changes.
OpenTelemetryTraceContext thread_trace_context;
protected:
ThreadGroupStatusPtr thread_group;

View File

@ -134,7 +134,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
extern const int UNKNOWN_READ_METHOD;
extern const int NOT_IMPLEMENTED;
}
@ -1328,29 +1327,6 @@ void Context::setCurrentQueryId(const String & query_id)
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.client_trace_context.trace_id != UUID())
{
// Use the OpenTelemetry trace context we received from the client, and
// create a new span for the query.
query_trace_context = client_info.client_trace_context;
query_trace_context.span_id = thread_local_rng();
}
else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
std::bernoulli_distribution should_start_trace{
settings.opentelemetry_start_trace_probability};
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
query_trace_context.trace_flags = 1;
}
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.
@ -3449,4 +3425,52 @@ WriteSettings Context::getWriteSettings() const
return res;
}
OpenTelemetryThreadTraceContextScopePtr Context::startTracing(const std::string& name)
{
OpenTelemetryThreadTraceContextScopePtr trace_context;
if (this->client_info.client_trace_context.trace_id != UUID())
{
// Use the OpenTelemetry trace context we received from the client, and
// initialize the tracing context for this query on current thread
return std::make_unique<OpenTelemetryThreadTraceContextScope>(name,
this->client_info.client_trace_context,
this->getOpenTelemetrySpanLog());
}
// start the trace ourselves, with some configurable probability.
std::bernoulli_distribution should_start_trace{settings.opentelemetry_start_trace_probability};
if (!should_start_trace(thread_local_rng))
{
return trace_context;
}
/// Generate random UUID, but using lower quality RNG,
/// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive.
/// NOTE: Actually we don't need to use UUIDs for query identifiers.
/// We could use any suitable string instead.
union
{
char bytes[16];
struct
{
UInt64 a;
UInt64 b;
} words;
UUID uuid{};
} random;
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
OpenTelemetryTraceContext query_trace_context;
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = 0;
// Mark this trace as sampled in the flags.
query_trace_context.trace_flags = 1;
return std::make_unique<OpenTelemetryThreadTraceContextScope>(name,
query_trace_context,
this->getOpenTelemetrySpanLog());
}
}

View File

@ -361,8 +361,7 @@ private:
inline static ContextPtr global_context_instance;
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
OpenTelemetryThreadTraceContextScopePtr startTracing(const std::string& name);
private:
using SampleBlockCache = std::unordered_map<std::string, Block>;

View File

@ -1,29 +1,15 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Common/OpenTelemetryTraceContext.h>
namespace DB
{
struct OpenTelemetrySpan
{
UUID trace_id;
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
UInt64 start_time_us;
UInt64 finish_time_us;
Map attributes;
// I don't understand how Links work, namely, which direction should they
// point to, and how they are related with parent_span_id, so no Links for now.
};
struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan
{
OpenTelemetrySpanLogElement() = default;
explicit OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span)
OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span)
: OpenTelemetrySpan(span) {}
static std::string name() { return "OpenTelemetrySpanLog"; }
@ -41,14 +27,18 @@ public:
using SystemLog<OpenTelemetrySpanLogElement>::SystemLog;
};
typedef std::shared_ptr<OpenTelemetrySpanLog> OpenTelemetrySpanLogPtr;
struct OpenTelemetrySpanHolder : public OpenTelemetrySpan
{
OpenTelemetrySpanHolder(const OpenTelemetryTraceContext& _trace_context, OpenTelemetrySpanLogPtr _span_log, const std::string & _operation_name);
explicit OpenTelemetrySpanHolder(const std::string & _operation_name);
void addAttribute(const std::string& name, UInt64 value);
void addAttribute(const std::string& name, const std::string& value);
void addAttribute(const Exception & e);
void addAttribute(std::exception_ptr e);
void finish();
~OpenTelemetrySpanHolder();
};

View File

@ -84,15 +84,6 @@ void ThreadStatus::attachQueryContext(ContextPtr query_context_)
thread_group->global_context = global_context;
}
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
// FIXME why and how is this different from setupState()?
thread_trace_context = query_context_->query_trace_context;
if (thread_trace_context.trace_id != UUID())
{
thread_trace_context.span_id = thread_local_rng();
}
applyQuerySettings();
}
@ -132,18 +123,6 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
if (auto query_context_ptr = query_context.lock())
{
applyQuerySettings();
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context_ptr->query_trace_context;
if (thread_trace_context.trace_id != UUID())
{
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
@ -353,42 +332,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
auto query_context_ptr = query_context.lock();
if (thread_trace_context.trace_id != UUID() && query_context_ptr)
{
opentelemetry_span_log = query_context_ptr->getOpenTelemetrySpanLog();
}
if (opentelemetry_span_log)
{
// Log the current thread span.
// We do this manually, because we can't use OpenTelemetrySpanHolder as a
// ThreadStatus member, because of linking issues. This file is linked
// separately, so we can reference OpenTelemetrySpanLog here, but if we had
// the span holder as a field, we would have to reference it in the
// destructor, which is in another library.
OpenTelemetrySpanLogElement span;
span.trace_id = thread_trace_context.trace_id;
// All child span holders should be finished by the time we detach this
// thread, so the current span id should be the thread span id. If not,
// an assertion for a proper parent span in ~OpenTelemetrySpanHolder()
// is going to fail, because we're going to reset it to zero later in
// this function.
span.span_id = thread_trace_context.span_id;
assert(query_context_ptr);
span.parent_span_id = query_context_ptr->query_trace_context.span_id;
span.operation_name = getThreadName();
span.start_time_us = query_start_time_microseconds;
span.finish_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
span.attributes.push_back(Tuple{"clickhouse.thread_id", toString(thread_id)});
opentelemetry_span_log->add(span);
}
finalizeQueryProfiler();
finalizePerformanceCounters();
@ -404,8 +347,6 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context.reset();
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;