fixes for context hierarchy

This commit is contained in:
Alexander Kuzmenkov 2020-11-18 20:43:18 +03:00
parent b16c5a1748
commit 1570320e20
15 changed files with 141 additions and 106 deletions

View File

@ -2515,7 +2515,7 @@ public:
{
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
traceparent, error))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -2526,7 +2526,7 @@ public:
if (options.count("opentelemetry-tracestate"))
{
context.getClientInfo().opentelemetry_tracestate =
context.getClientInfo().client_trace_context.tracestate =
options["opentelemetry-tracestate"].as<std::string>();
}

View File

@ -0,0 +1,20 @@
#pragma once
namespace DB {
// The runtime info we need to create new OpenTelemetry spans.
struct OpenTelemetryTraceContext
{
__uint128_t trace_id = 0;
UInt64 span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them
// downstream. See https://www.w3.org/TR/trace-context/
String tracestate;
__uint8_t trace_flags = 0;
// Parse/compose OpenTelemetry traceparent header.
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
};
}

View File

@ -3,6 +3,7 @@
#include <common/StringRef.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Core/SettingsEnums.h>
@ -108,8 +109,11 @@ public:
using Deleter = std::function<void()>;
Deleter deleter;
__uint128_t opentelemetry_trace_id;
UInt64 opentelemetry_current_span_id;
// This is the current most-derived OpenTelemetry span for this thread. It
// can be changed throughout the query execution, whenever we enter a new
// span or exit it. See OpenTelemetrySpanHolder that is normally responsible
// for these changes.
OpenTelemetryTraceContext thread_trace_context;
protected:
ThreadGroupStatusPtr thread_group;

View File

@ -27,6 +27,10 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
{
ClientInfo modified_client_info = client_info_;
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.opentelemetry = CurrentThread::get().opentelemetry;
}
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.

View File

@ -156,6 +156,10 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context.getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
}
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);

View File

@ -62,16 +62,16 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
if (opentelemetry_trace_id)
if (client_trace_context.trace_id)
{
// Have OpenTelemetry header.
writeBinary(uint8_t(1), out);
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
writeBinary(client_trace_context.trace_id, out);
writeBinary(client_trace_context.span_id, out);
writeBinary(client_trace_context.tracestate, out);
writeBinary(client_trace_context.trace_flags, out);
}
else
{
@ -139,10 +139,10 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(have_trace_id, in);
if (have_trace_id)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
readBinary(client_trace_context.trace_id, in);
readBinary(client_trace_context.span_id, in);
readBinary(client_trace_context.tracestate, in);
readBinary(client_trace_context.trace_flags, in);
}
}
}
@ -155,14 +155,14 @@ void ClientInfo::setInitialQuery()
client_name = (DBMS_NAME " ") + client_name;
}
bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
uint64_t trace_parent = 0;
uint8_t trace_flags = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
@ -183,7 +183,7 @@ bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &trace_parent, &trace_flags);
&version, &trace_id_high, &trace_id_low, &span_id, &trace_flags);
if (result == EOF)
{
@ -205,23 +205,21 @@ bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
return false;
}
opentelemetry_trace_id = static_cast<__uint128_t>(trace_id_high) << 64
trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
opentelemetry_span_id = trace_parent;
opentelemetry_trace_flags = trace_flags;
return true;
}
std::string ClientInfo::composeTraceparentHeader() const
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id,
opentelemetry_span_id,
return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id,
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(opentelemetry_trace_flags));
static_cast<uint8_t>(trace_flags));
}
void ClientInfo::fillOSUserHostNameAndVersionInfo()

View File

@ -3,7 +3,7 @@
#include <Poco/Net/SocketAddress.h>
#include <Common/UInt128.h>
#include <common/types.h>
#include <Common/OpenTelemetryTraceContext.h>
namespace DB
{
@ -59,16 +59,9 @@ public:
String initial_query_id;
Poco::Net::SocketAddress initial_address;
// OpenTelemetry trace information.
__uint128_t opentelemetry_trace_id = 0;
// The span id we get the in the incoming client info becomes our parent span
// id, and the span id we send becomes downstream parent span id.
UInt64 opentelemetry_span_id = 0;
UInt64 opentelemetry_parent_span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them downstream.
// They are described at https://www.w3.org/TR/trace-context/
String opentelemetry_tracestate;
UInt8 opentelemetry_trace_flags = 0;
// OpenTelemetry trace context we received from client, or which we are going
// to send to server.
OpenTelemetryTraceContext client_trace_context;
/// All below are parameters related to initial query.
@ -102,16 +95,6 @@ public:
/// Initialize parameters on client initiating query.
void setInitialQuery();
// Parse/compose OpenTelemetry traceparent header.
// Note that these functions use span_id field, not parent_span_id, same as
// in native protocol. The incoming traceparent corresponds to the upstream
// trace span, and the outgoing traceparent corresponds to our current span.
// We use the same ClientInfo structure first for incoming span, and then
// for our span: when we switch, we use old span_id as parent_span_id, and
// generate a new span_id (currently this happens in Context::setQueryId()).
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
private:
void fillOSUserHostNameAndVersionInfo();
};

View File

@ -1127,8 +1127,14 @@ void Context::setCurrentQueryId(const String & query_id)
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& client_info.opentelemetry_trace_id == 0)
if (client_info.client_trace_context.trace_id != 0)
{
// Use the OpenTelemetry trace context we received from the client, and
// create a new span for the query.
query_trace_context = client_info.client_trace_context;
query_trace_context.span_id = thread_local_rng();
}
else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
@ -1138,20 +1144,12 @@ void Context::setCurrentQueryId(const String & query_id)
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
client_info.opentelemetry_trace_flags = 1;
query_trace_context.trace_flags = 1;
}
}
else
{
// The incoming request has an OpenTelemtry trace context. Its span id
// becomes our parent span id.
client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id;
client_info.opentelemetry_span_id = thread_local_rng();
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.

View File

@ -13,6 +13,7 @@
#include <Common/LRUCache.h>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Storages/IStorage_fwd.h>
#include <atomic>
#include <chrono>
@ -198,6 +199,12 @@ private:
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
// a query context.
OpenTelemetryTraceContext query_trace_context;
private:
friend class NamedSessions;
using SampleBlockCache = std::unordered_map<std::string, Block>;

View File

@ -54,15 +54,25 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
{
trace_id = 0;
if (!CurrentThread::isInitialized())
{
// There may be no thread context if we're running inside the
// clickhouse-client, e.g. reading an external table provided with the
// `--external` option.
return;
}
auto & thread = CurrentThread::get();
trace_id = thread.opentelemetry_trace_id;
trace_id = thread.thread_trace_context.trace_id;
if (!trace_id)
{
return;
}
parent_span_id = thread.opentelemetry_current_span_id;
parent_span_id = thread.thread_trace_context.span_id;
span_id = thread_local_rng();
operation_name = _operation_name;
start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
@ -72,7 +82,7 @@ OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_
attribute_names.push_back("clickhouse.start.stacktrace");
attribute_values.push_back(StackTrace().toString());
thread.opentelemetry_current_span_id = span_id;
thread.thread_trace_context.span_id = span_id;
}
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
@ -86,8 +96,8 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
// First of all, return old value of current span.
auto & thread = CurrentThread::get();
assert(thread.opentelemetry_current_span_id = span_id);
thread.opentelemetry_current_span_id = parent_span_id;
assert(thread.thread_trace_context.span_id = span_id);
thread.thread_trace_context.span_id = parent_span_id;
// Not sure what's the best way to access the log from here.
auto * thread_group = CurrentThread::getGroup().get();

View File

@ -74,6 +74,15 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
thread_group->global_context = global_context;
}
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
// FIXME why and how is this different from setupState()?
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
applyQuerySettings();
}
@ -112,20 +121,17 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
applyQuerySettings();
opentelemetry_trace_id = query_context->getClientInfo().opentelemetry_trace_id;
if (opentelemetry_trace_id)
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
opentelemetry_current_span_id = thread_local_rng();
}
else
{
opentelemetry_current_span_id = 0;
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
opentelemetry_trace_id = 0;
opentelemetry_current_span_id = 0;
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
@ -319,7 +325,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
if (opentelemetry_trace_id && query_context)
if (thread_trace_context.trace_id && query_context)
{
opentelemetry_span_log = query_context->getOpenTelemetrySpanLog();
}
@ -334,11 +340,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
// destructor, which is in another library.
OpenTelemetrySpanLogElement span;
span.trace_id = opentelemetry_trace_id;
span.trace_id = thread_trace_context.trace_id;
// Might be problematic if some span holder isn't finished by the time
// we detach this thread...
span.span_id = opentelemetry_current_span_id;
span.parent_span_id = query_context->getClientInfo().opentelemetry_span_id;
span.span_id = thread_trace_context.span_id;
span.parent_span_id = query_context->query_trace_context.span_id;
span.operation_name = getThreadName();
span.start_time_us = query_start_time_microseconds;
span.finish_time_us =
@ -364,8 +370,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context = nullptr;
opentelemetry_trace_id = 0;
opentelemetry_current_span_id = 0;
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;

View File

@ -153,13 +153,11 @@ static void logQuery(const String & query, const Context & context, bool interna
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query));
if (client_info.opentelemetry_trace_id)
if (client_info.client_trace_context.trace_id)
{
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
client_info.opentelemetry_trace_id,
client_info.opentelemetry_span_id,
client_info.opentelemetry_parent_span_id);
"OpenTelemetry traceparent '{}'",
client_info.client_trace_context.composeTraceparentHeader());
}
}
}
@ -247,13 +245,13 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
query_log->add(elem);
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = current_time_us;
span.finish_time_us = current_time_us;
@ -269,11 +267,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);
@ -689,13 +687,13 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = elem.query_start_time_microseconds;
span.finish_time_us = time_in_microseconds(finish_time);
@ -710,11 +708,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);

View File

@ -318,7 +318,7 @@ void HTTPHandler::processQuery(
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
@ -326,7 +326,7 @@ void HTTPHandler::processQuery(
opentelemetry_traceparent, error);
}
context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", "");
context.getClientInfo().client_trace_context.tracestate = request.get("tracestate", "");
}
#endif

View File

@ -74,16 +74,19 @@ namespace
ReadWriteBufferFromHTTP::HTTPHeaderEntries header;
// Propagate OpenTelemetry trace context, if any, downstream.
const auto & client_info = context.getClientInfo();
if (client_info.opentelemetry_trace_id)
if (CurrentThread::isInitialized())
{
header.emplace_back("traceparent",
client_info.composeTraceparentHeader());
if (!client_info.opentelemetry_tracestate.empty())
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (opentelemetry.trace_id)
{
header.emplace_back("tracestate",
client_info.opentelemetry_tracestate);
header.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
header.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
}

View File

@ -1,5 +1,5 @@
===http===
{"total spans":"4","unique spans":"4","unique non-zero parent spans":"2"}
{"total spans":"4","unique spans":"4","unique non-zero parent spans":"3"}
{"initial query spans with proper parent":"1"}
{"unique non-empty tracestate values":"1"}
===native===