Move classes into DB::OpenTelemetry namespace

This commit is contained in:
Frank Chen 2022-08-24 16:41:40 +08:00
parent efc6a60a60
commit cd19366b44
15 changed files with 110 additions and 95 deletions

View File

@ -484,13 +484,13 @@ void Connection::sendQuery(
bool with_pending_data, bool with_pending_data,
std::function<void(const Progress &)>) std::function<void(const Progress &)>)
{ {
OpenTelemetrySpanHolder span("Connection::sendQuery()"); SpanHolder span("Connection::sendQuery()");
span.addAttribute("clickhouse.query_id", query_id_); span.addAttribute("clickhouse.query_id", query_id_);
span.addAttribute("clickhouse.query", query); span.addAttribute("clickhouse.query", query);
span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); }); span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); });
ClientInfo new_client_info; ClientInfo new_client_info;
const auto &current_trace_context = OpenTelemetryThreadTraceContext::current(); const auto &current_trace_context = TracingContextOnThread::current();
if (client_info && current_trace_context.isTraceEnabled()) if (client_info && current_trace_context.isTraceEnabled())
{ {
// use current span as the parent of remote span // use current span as the parent of remote span

View File

@ -9,10 +9,12 @@
namespace DB namespace DB
{ {
namespace OpenTelemetry
{
thread_local OpenTelemetryThreadTraceContext current_thread_trace_context; thread_local TracingContextOnThread current_thread_trace_context;
void OpenTelemetrySpan::addAttribute(std::string_view name, UInt64 value) void Span::addAttribute(std::string_view name, UInt64 value)
{ {
if (!this->isTraceEnabled() || name.empty()) if (!this->isTraceEnabled() || name.empty())
return; return;
@ -20,13 +22,13 @@ void OpenTelemetrySpan::addAttribute(std::string_view name, UInt64 value)
this->attributes.push_back(Tuple{name, toString(value)}); this->attributes.push_back(Tuple{name, toString(value)});
} }
void OpenTelemetrySpan::addAttributeIfNotZero(std::string_view name, UInt64 value) void Span::addAttributeIfNotZero(std::string_view name, UInt64 value)
{ {
if (value != 0) if (value != 0)
addAttribute(name, value); addAttribute(name, value);
} }
void OpenTelemetrySpan::addAttribute(std::string_view name, std::string_view value) void Span::addAttribute(std::string_view name, std::string_view value)
{ {
if (!this->isTraceEnabled() || name.empty()) if (!this->isTraceEnabled() || name.empty())
return; return;
@ -34,7 +36,7 @@ void OpenTelemetrySpan::addAttribute(std::string_view name, std::string_view val
this->attributes.push_back(Tuple{name, value}); this->attributes.push_back(Tuple{name, value});
} }
void OpenTelemetrySpan::addAttribute(std::string_view name, std::function<String()> value_supplier) void Span::addAttribute(std::string_view name, std::function<String()> value_supplier)
{ {
if (!this->isTraceEnabled() || !value_supplier) if (!this->isTraceEnabled() || !value_supplier)
return; return;
@ -46,7 +48,7 @@ void OpenTelemetrySpan::addAttribute(std::string_view name, std::function<String
this->attributes.push_back(Tuple{name, value}); this->attributes.push_back(Tuple{name, value});
} }
void OpenTelemetrySpan::addAttribute(const Exception & e) noexcept void Span::addAttribute(const Exception & e) noexcept
{ {
if (!this->isTraceEnabled()) if (!this->isTraceEnabled())
return; return;
@ -60,7 +62,7 @@ void OpenTelemetrySpan::addAttribute(const Exception & e) noexcept
} }
} }
void OpenTelemetrySpan::addAttribute(std::exception_ptr e) noexcept void Span::addAttribute(std::exception_ptr e) noexcept
{ {
if (!this->isTraceEnabled() || e == nullptr) if (!this->isTraceEnabled() || e == nullptr)
return; return;
@ -74,7 +76,7 @@ void OpenTelemetrySpan::addAttribute(std::exception_ptr e) noexcept
} }
} }
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(std::string_view _operation_name) SpanHolder::SpanHolder(std::string_view _operation_name)
{ {
if (current_thread_trace_context.isTraceEnabled()) if (current_thread_trace_context.isTraceEnabled())
{ {
@ -90,7 +92,7 @@ OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(std::string_view _operation_nam
} }
} }
void OpenTelemetrySpanHolder::finish() void SpanHolder::finish()
{ {
if (!this->isTraceEnabled()) if (!this->isTraceEnabled())
return; return;
@ -121,12 +123,12 @@ void OpenTelemetrySpanHolder::finish()
trace_id = UUID(); trace_id = UUID();
} }
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder() SpanHolder::~SpanHolder()
{ {
finish(); finish();
} }
bool OpenTelemetryTraceContext::parseTraceparentHeader(std::string_view traceparent, String & error) bool TracingContext::parseTraceparentHeader(std::string_view traceparent, String & error)
{ {
trace_id = 0; trace_id = 0;
@ -185,7 +187,7 @@ bool OpenTelemetryTraceContext::parseTraceparentHeader(std::string_view tracepar
return true; return true;
} }
String OpenTelemetryTraceContext::composeTraceparentHeader() const String TracingContext::composeTraceparentHeader() const
{ {
// This span is a parent for its children, so we specify this span_id as a // This span is a parent for its children, so we specify this span_id as a
// parent id. // parent id.
@ -199,12 +201,12 @@ String OpenTelemetryTraceContext::composeTraceparentHeader() const
static_cast<uint8_t>(trace_flags)); static_cast<uint8_t>(trace_flags));
} }
const OpenTelemetryThreadTraceContext & OpenTelemetryThreadTraceContext::current() const TracingContextOnThread & TracingContextOnThread::current()
{ {
return current_thread_trace_context; return current_thread_trace_context;
} }
void OpenTelemetryThreadTraceContext::reset() void TracingContextOnThread::reset()
{ {
this->trace_id = UUID(); this->trace_id = UUID();
this->span_id = 0; this->span_id = 0;
@ -213,9 +215,9 @@ void OpenTelemetryThreadTraceContext::reset()
this->span_log.reset(); this->span_log.reset();
} }
OpenTelemetryThreadTraceContextScope::OpenTelemetryThreadTraceContextScope( TracingContextHolder::TracingContextHolder(
std::string_view _operation_name, std::string_view _operation_name,
OpenTelemetryTraceContext _parent_trace_context, TracingContext _parent_trace_context,
const Settings * settings_ptr, const Settings * settings_ptr,
const std::weak_ptr<OpenTelemetrySpanLog> & _span_log) const std::weak_ptr<OpenTelemetrySpanLog> & _span_log)
{ {
@ -278,7 +280,7 @@ OpenTelemetryThreadTraceContextScope::OpenTelemetryThreadTraceContextScope(
current_thread_trace_context.span_log = _span_log; current_thread_trace_context.span_log = _span_log;
} }
OpenTelemetryThreadTraceContextScope::~OpenTelemetryThreadTraceContextScope() TracingContextHolder::~TracingContextHolder()
{ {
if (this->root_span.isTraceEnabled()) if (this->root_span.isTraceEnabled())
{ {
@ -307,3 +309,4 @@ OpenTelemetryThreadTraceContextScope::~OpenTelemetryThreadTraceContextScope()
} }
} }
}

View File

@ -6,8 +6,12 @@ namespace DB
{ {
struct Settings; struct Settings;
class OpenTelemetrySpanLog;
struct OpenTelemetrySpan namespace OpenTelemetry
{
struct Span
{ {
UUID trace_id{}; UUID trace_id{};
UInt64 span_id = 0; UInt64 span_id = 0;
@ -33,8 +37,6 @@ struct OpenTelemetrySpan
} }
}; };
class OpenTelemetrySpanLog;
/// See https://www.w3.org/TR/trace-context/ for trace_flags definition /// See https://www.w3.org/TR/trace-context/ for trace_flags definition
enum TraceFlags : UInt8 enum TraceFlags : UInt8
{ {
@ -42,8 +44,8 @@ enum TraceFlags : UInt8
TRACE_FLAG_SAMPLED = 1, TRACE_FLAG_SAMPLED = 1,
}; };
// The runtime info we need to create new OpenTelemetry spans. /// The runtime info we need to create new OpenTelemetry spans.
struct OpenTelemetryTraceContext struct TracingContext
{ {
UUID trace_id{}; UUID trace_id{};
UInt64 span_id = 0; UInt64 span_id = 0;
@ -62,39 +64,42 @@ struct OpenTelemetryTraceContext
} }
}; };
/// Tracing context kept on thread local /// Tracing context kept on each thread
struct OpenTelemetryThreadTraceContext : OpenTelemetryTraceContext struct TracingContextOnThread : TracingContext
{ {
OpenTelemetryThreadTraceContext& operator =(const OpenTelemetryTraceContext& context) TracingContextOnThread& operator =(const TracingContext& context)
{ {
*(static_cast<OpenTelemetryTraceContext*>(this)) = context; *(static_cast<TracingContext*>(this)) = context;
return *this; return *this;
} }
void reset(); void reset();
static const OpenTelemetryThreadTraceContext& current(); static const TracingContextOnThread& current();
/// Use weak_ptr instead of shared_ptr to hold a reference to the underlying system.opentelemetry_span_log table /// Use weak_ptr instead of shared_ptr to hold a reference to the underlying system.opentelemetry_span_log table
/// Since this object is kept on threads and passed across threads, a weak_ptr is more safe to prevent potential leak /// Since this object is kept on threads and passed across threads, a weak_ptr is more safe to prevent potential leak
std::weak_ptr<OpenTelemetrySpanLog> span_log; std::weak_ptr<OpenTelemetrySpanLog> span_log;
}; };
/// A scoped tracing context, is used to hold the tracing context at the beginning of each thread execution and clear the context automatically when the scope exists. /// Holder of tracing context.
/// It should be the root of all span logs for one tracing. /// It should be initialized at the beginning of each thread execution.
/// And once it's destructed, it clears the context automatically.
/// ///
/// It's SAFE to construct this object multiple times on one same thread, /// It's also the root of all spans on current thread execution.
/// but it's not encourage to do so because this is only a protection in case of code changes. ///
struct OpenTelemetryThreadTraceContextScope /// Although it's SAFE to construct this object multiple times on one same thread,
/// but rememeber only use it at the beginning of one thread
struct TracingContextHolder
{ {
/// Forbidden copy ctor and assignment to make the destructor safe /// Forbidden copy ctor and assignment to make the destructor safe
OpenTelemetryThreadTraceContextScope(const OpenTelemetryThreadTraceContextScope& scope) = delete; TracingContextHolder(const TracingContextHolder& scope) = delete;
OpenTelemetryThreadTraceContextScope& operator =(const OpenTelemetryThreadTraceContextScope& scope) = delete; TracingContextHolder& operator =(const TracingContextHolder& scope) = delete;
OpenTelemetryThreadTraceContextScope(std::string_view _operation_name, TracingContextHolder(std::string_view _operation_name,
const OpenTelemetryTraceContext& _parent_trace_context, const TracingContext& _parent_trace_context,
const std::weak_ptr<OpenTelemetrySpanLog>& _log) const std::weak_ptr<OpenTelemetrySpanLog>& _log)
: OpenTelemetryThreadTraceContextScope(_operation_name, : TracingContextHolder(_operation_name,
_parent_trace_context, _parent_trace_context,
nullptr, nullptr,
_log) _log)
@ -102,9 +107,9 @@ struct OpenTelemetryThreadTraceContextScope
} }
/// Initialize a tracing context on a child thread based on the context from the parent thread /// Initialize a tracing context on a child thread based on the context from the parent thread
OpenTelemetryThreadTraceContextScope(std::string_view _operation_name, TracingContextHolder(std::string_view _operation_name,
const OpenTelemetryThreadTraceContext& _parent_thread_trace_context) const TracingContextOnThread& _parent_thread_trace_context)
: OpenTelemetryThreadTraceContextScope(_operation_name, : TracingContextHolder(_operation_name,
_parent_thread_trace_context, _parent_thread_trace_context,
nullptr, nullptr,
_parent_thread_trace_context.span_log) _parent_thread_trace_context.span_log)
@ -112,39 +117,47 @@ struct OpenTelemetryThreadTraceContextScope
} }
/// For servers like HTTP/TCP/GRPC to initialize tracing context on thread that process requests from clients /// For servers like HTTP/TCP/GRPC to initialize tracing context on thread that process requests from clients
OpenTelemetryThreadTraceContextScope(std::string_view _operation_name, TracingContextHolder(std::string_view _operation_name,
OpenTelemetryTraceContext _parent_trace_context, TracingContext _parent_trace_context,
const Settings& _settings, const Settings& _settings,
const std::weak_ptr<OpenTelemetrySpanLog>& _log) const std::weak_ptr<OpenTelemetrySpanLog>& _log)
: OpenTelemetryThreadTraceContextScope(_operation_name, : TracingContextHolder(_operation_name,
_parent_trace_context, _parent_trace_context,
&_settings, &_settings,
_log) _log)
{ {
} }
OpenTelemetryThreadTraceContextScope(std::string_view _operation_name, TracingContextHolder(std::string_view _operation_name,
OpenTelemetryTraceContext _parent_trace_context, TracingContext _parent_trace_context,
const Settings* settings_ptr, const Settings* settings_ptr,
const std::weak_ptr<OpenTelemetrySpanLog>& _log); const std::weak_ptr<OpenTelemetrySpanLog>& _log);
~OpenTelemetryThreadTraceContextScope(); ~TracingContextHolder();
OpenTelemetrySpan root_span; Span root_span;
private: private:
bool is_context_owner = true; bool is_context_owner = true;
}; };
using OpenTelemetryThreadTraceContextScopePtr = std::unique_ptr<OpenTelemetryThreadTraceContextScope>; using TracingContextHolderPtr = std::unique_ptr<TracingContextHolder>;
/// A span holder is usually used in a function scope /// A span holder that creates span automatically in a (function) scope if tracing is enabled.
struct OpenTelemetrySpanHolder : public OpenTelemetrySpan /// Once it's created or destructed, it automatically maitains the tracing context on the thread that it lives.
struct SpanHolder : public Span
{ {
OpenTelemetrySpanHolder(std::string_view); SpanHolder(std::string_view);
~OpenTelemetrySpanHolder(); ~SpanHolder();
/// Finish a span explicitly if needed.
/// It's safe to call it multiple times
void finish(); void finish();
}; };
} }
using namespace OpenTelemetry;
}

View File

@ -152,7 +152,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
// this scheduleImpl is called in the parent thread, // this scheduleImpl is called in the parent thread,
// the tracing context on this thread is used as parent context for the sub-thread that runs the job // the tracing context on this thread is used as parent context for the sub-thread that runs the job
const auto &current_thread_context = DB::OpenTelemetryThreadTraceContext::current(); const auto &current_thread_context = DB::TracingContextOnThread::current();
jobs.emplace(std::move(job), priority, current_thread_context); jobs.emplace(std::move(job), priority, current_thread_context);
++scheduled_jobs; ++scheduled_jobs;
new_job_or_shutdown.notify_one(); new_job_or_shutdown.notify_one();
@ -255,7 +255,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
bool need_shutdown = false; bool need_shutdown = false;
/// A copy of parent trace context /// A copy of parent trace context
DB::OpenTelemetryThreadTraceContext parent_thead_trace_context; DB::TracingContextOnThread parent_thead_trace_context;
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
@ -267,7 +267,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
/// boost::priority_queue does not provide interface for getting non-const reference to an element /// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job. /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
job = std::move(const_cast<Job &>(jobs.top().job)); job = std::move(const_cast<Job &>(jobs.top().job));
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetryThreadTraceContext &>(jobs.top().thread_trace_context)); parent_thead_trace_context = std::move(const_cast<DB::TracingContextOnThread &>(jobs.top().thread_trace_context));
jobs.pop(); jobs.pop();
} }
else else
@ -281,7 +281,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
if (!need_shutdown) if (!need_shutdown)
{ {
/// Set up tracing context for this thread by its parent context /// Set up tracing context for this thread by its parent context
DB::OpenTelemetryThreadTraceContextScope thread_trace_context("ThreadPool::worker()" , DB::TracingContextHolder thread_trace_context("ThreadPool::worker()" ,
parent_thead_trace_context); parent_thead_trace_context);
try try

View File

@ -97,9 +97,9 @@ private:
{ {
Job job; Job job;
int priority; int priority;
DB::OpenTelemetryThreadTraceContext thread_trace_context; DB::OpenTelemetry::TracingContextOnThread thread_trace_context;
JobWithPriority(Job job_, int priority_, const DB::OpenTelemetryThreadTraceContext& thread_trace_context_) JobWithPriority(Job job_, int priority_, const DB::OpenTelemetry::TracingContextOnThread& thread_trace_context_)
: job(job_), priority(priority_), thread_trace_context(thread_trace_context_) {} : job(job_), priority(priority_), thread_trace_context(thread_trace_context_) {}
bool operator< (const JobWithPriority & rhs) const bool operator< (const JobWithPriority & rhs) const

View File

@ -62,9 +62,8 @@ public:
time_t initial_query_start_time{}; time_t initial_query_start_time{};
Decimal64 initial_query_start_time_microseconds{}; Decimal64 initial_query_start_time_microseconds{};
// OpenTelemetry trace context we received from client, or which we are going /// OpenTelemetry trace context we received from client, or which we are going to send to server.
// to send to server. TracingContext client_trace_context;
OpenTelemetryTraceContext client_trace_context;
/// All below are parameters related to initial query. /// All below are parameters related to initial query.

View File

@ -7,11 +7,11 @@
namespace DB namespace DB
{ {
struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan struct OpenTelemetrySpanLogElement : public Span
{ {
OpenTelemetrySpanLogElement() = default; OpenTelemetrySpanLogElement() = default;
OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span) OpenTelemetrySpanLogElement(const Span & span)
: OpenTelemetrySpan(span) {} : Span(span) {}
static std::string name() { return "OpenTelemetrySpanLog"; } static std::string name() { return "OpenTelemetrySpanLog"; }
static NamesAndTypesList getNamesAndTypes(); static NamesAndTypesList getNamesAndTypes();

View File

@ -233,7 +233,7 @@ inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock>
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count(); return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
} }
static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr context, UInt64 current_time_us, ASTPtr ast, const std::shared_ptr<OpenTelemetrySpanHolder>& query_span) static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr context, UInt64 current_time_us, ASTPtr ast, const std::shared_ptr<SpanHolder> & query_span)
{ {
/// Exception before the query execution. /// Exception before the query execution.
if (auto quota = context->getQuota()) if (auto quota = context->getQuota())
@ -345,7 +345,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
QueryProcessingStage::Enum stage, QueryProcessingStage::Enum stage,
ReadBuffer * istr) ReadBuffer * istr)
{ {
std::shared_ptr<OpenTelemetrySpanHolder> query_span = std::make_shared<OpenTelemetrySpanHolder>("query"); std::shared_ptr<SpanHolder> query_span = std::make_shared<SpanHolder>("query");
const auto current_time = std::chrono::system_clock::now(); const auto current_time = std::chrono::system_clock::now();
@ -667,12 +667,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
} }
{ {
std::unique_ptr<OpenTelemetrySpanHolder> span; std::unique_ptr<SpanHolder> span;
if (query_span->isTraceEnabled()) if (query_span->isTraceEnabled())
{ {
auto * raw_interpreter_ptr = interpreter.get(); auto * raw_interpreter_ptr = interpreter.get();
std::string class_name(demangle(typeid(*raw_interpreter_ptr).name())); std::string class_name(demangle(typeid(*raw_interpreter_ptr).name()));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()"); span = std::make_unique<SpanHolder>(class_name + "::execute()");
} }
res = interpreter->execute(); res = interpreter->execute();
} }
@ -930,7 +930,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_span->addAttribute("db.statement", elem.query); query_span->addAttribute("db.statement", elem.query);
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id); query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
query_span->addAttribute("clickhouse.tracestate", OpenTelemetryThreadTraceContext::current().tracestate); query_span->addAttribute("clickhouse.tracestate", TracingContextOnThread::current().tracestate);
query_span->addAttribute("clickhouse.query_status", "QueryFinish"); query_span->addAttribute("clickhouse.query_status", "QueryFinish");
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows); query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes); query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);

View File

@ -71,11 +71,11 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
bool ExecutionThreadContext::executeTask() bool ExecutionThreadContext::executeTask()
{ {
std::unique_ptr<OpenTelemetrySpanHolder> span; std::unique_ptr<SpanHolder> span;
if (trace_processors) if (trace_processors)
{ {
span = std::make_unique<OpenTelemetrySpanHolder>("ExecutionThreadContext::executeTask() " + node->processor->getName()); span = std::make_unique<SpanHolder>("ExecutionThreadContext::executeTask() " + node->processor->getName());
span->addAttribute("thread_number", thread_number); span->addAttribute("thread_number", thread_number);
} }
std::optional<Stopwatch> execution_time_watch; std::optional<Stopwatch> execution_time_watch;

View File

@ -662,7 +662,7 @@ namespace
std::optional<Session> session; std::optional<Session> session;
ContextMutablePtr query_context; ContextMutablePtr query_context;
std::optional<CurrentThread::QueryScope> query_scope; std::optional<CurrentThread::QueryScope> query_scope;
OpenTelemetryThreadTraceContextScopePtr thread_trace_context; TracingContextHolderPtr thread_trace_context;
String query_text; String query_text;
ASTPtr ast; ASTPtr ast;
ASTInsertQuery * insert_query = nullptr; ASTInsertQuery * insert_query = nullptr;
@ -842,7 +842,7 @@ namespace
query_scope.emplace(query_context); query_scope.emplace(query_context);
/// Set up tracing context for this query on current thread /// Set up tracing context for this query on current thread
thread_trace_context = std::make_unique<OpenTelemetryThreadTraceContextScope>("GRPCServer", thread_trace_context = std::make_unique<TracingContextHolder>("GRPCServer",
query_context->getClientInfo().client_trace_context, query_context->getClientInfo().client_trace_context,
query_context->getSettingsRef(), query_context->getSettingsRef(),
query_context->getOpenTelemetrySpanLog()); query_context->getOpenTelemetrySpanLog());

View File

@ -933,7 +933,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
/// In case of exception, send stack trace to client. /// In case of exception, send stack trace to client.
bool with_stacktrace = false; bool with_stacktrace = false;
OpenTelemetryThreadTraceContextScopePtr thread_trace_context; TracingContextHolderPtr thread_trace_context;
SCOPE_EXIT({ SCOPE_EXIT({
// make sure the response status is recorded // make sure the response status is recorded
if (thread_trace_context) if (thread_trace_context)
@ -963,7 +963,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
// Setup tracing context for this thread // Setup tracing context for this thread
auto context = session->sessionOrGlobalContext(); auto context = session->sessionOrGlobalContext();
thread_trace_context = std::make_unique<OpenTelemetryThreadTraceContextScope>("HTTPHandler", thread_trace_context = std::make_unique<TracingContextHolder>("HTTPHandler",
client_info.client_trace_context, client_info.client_trace_context,
context->getSettingsRef(), context->getSettingsRef(),
context->getOpenTelemetrySpanLog()); context->getOpenTelemetrySpanLog());

View File

@ -219,7 +219,7 @@ void TCPHandler::runImpl()
/// Initialized later. /// Initialized later.
std::optional<CurrentThread::QueryScope> query_scope; std::optional<CurrentThread::QueryScope> query_scope;
OpenTelemetryThreadTraceContextScopePtr thread_trace_context; TracingContextHolderPtr thread_trace_context;
/** An exception during the execution of request (it must be sent over the network to the client). /** An exception during the execution of request (it must be sent over the network to the client).
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet. * The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
@ -246,7 +246,7 @@ void TCPHandler::runImpl()
continue; continue;
/// Set up tracing context for this query on current thread /// Set up tracing context for this query on current thread
thread_trace_context = std::make_unique<OpenTelemetryThreadTraceContextScope>("TCPHandler", thread_trace_context = std::make_unique<TracingContextHolder>("TCPHandler",
query_context->getClientInfo().client_trace_context, query_context->getClientInfo().client_trace_context,
query_context->getSettingsRef(), query_context->getSettingsRef(),
query_context->getOpenTelemetrySpanLog()); query_context->getOpenTelemetrySpanLog());

View File

@ -609,7 +609,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path) void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{ {
OpenTelemetryThreadTraceContextScopePtr thread_trace_context; TracingContextHolderPtr thread_trace_context;
Stopwatch watch; Stopwatch watch;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef()); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef());
@ -629,9 +629,9 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
formatReadableQuantity(distributed_header.rows), formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes)); formatReadableSizeWithBinarySuffix(distributed_header.bytes));
thread_trace_context = std::make_unique<OpenTelemetryThreadTraceContextScope>(__PRETTY_FUNCTION__, thread_trace_context = std::make_unique<TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context, distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog()); this->storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote{*connection, timeouts, RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query, distributed_header.insert_query,
@ -870,7 +870,7 @@ private:
ReadBufferFromFile in(file_path->second); ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log); const auto & distributed_header = readDistributedHeader(in, parent.log);
OpenTelemetryThreadTraceContextScope thread_trace_context(__PRETTY_FUNCTION__, TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context, distributed_header.client_info.client_trace_context,
parent.storage.getContext()->getOpenTelemetrySpanLog()); parent.storage.getContext()->getOpenTelemetrySpanLog());
@ -909,9 +909,9 @@ private:
const auto & distributed_header = readDistributedHeader(in, parent.log); const auto & distributed_header = readDistributedHeader(in, parent.log);
// this function is called in a separated thread, so we set up the trace context from the file // this function is called in a separated thread, so we set up the trace context from the file
OpenTelemetryThreadTraceContextScope thread_trace_context(__PRETTY_FUNCTION__, TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context, distributed_header.client_info.client_trace_context,
parent.storage.getContext()->getOpenTelemetrySpanLog()); parent.storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote(connection, timeouts, RemoteInserter remote(connection, timeouts,
distributed_header.insert_query, distributed_header.insert_query,

View File

@ -336,7 +336,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
if (rows == 0) if (rows == 0)
return; return;
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__); SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num); span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows); span.addAttribute("clickhouse.written_rows", rows);
@ -419,7 +419,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
void DistributedSink::writeSync(const Block & block) void DistributedSink::writeSync(const Block & block)
{ {
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__); SpanHolder span(__PRETTY_FUNCTION__);
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo(); const auto & shards_info = cluster->getShardsInfo();
@ -610,7 +610,7 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{ {
OpenTelemetrySpanHolder span("DistributedSink::writeAsyncImpl()"); SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
@ -652,7 +652,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
void DistributedSink::writeToLocal(const Block & block, size_t repeats) void DistributedSink::writeToLocal(const Block & block, size_t repeats)
{ {
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__); SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string); span.addAttribute("db.statement", this->query_string);
InterpreterInsertQuery interp(query_ast, context, allow_materialized); InterpreterInsertQuery interp(query_ast, context, allow_materialized);
@ -668,7 +668,7 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names) void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{ {
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__); SpanHolder span(__PRETTY_FUNCTION__);
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef(); const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -737,11 +737,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
writeStringBinary(query_string, header_buf); writeStringBinary(query_string, header_buf);
context->getSettingsRef().write(header_buf); context->getSettingsRef().write(header_buf);
if (OpenTelemetryThreadTraceContext::current().isTraceEnabled()) if (TracingContextOnThread::current().isTraceEnabled())
{ {
// if the distributed tracing is enabled, use the trace context in current thread as parent of next span // if the distributed tracing is enabled, use the trace context in current thread as parent of next span
auto client_info = context->getClientInfo(); auto client_info = context->getClientInfo();
client_info.client_trace_context = OpenTelemetryThreadTraceContext::current(); client_info.client_trace_context = TracingContextOnThread::current();
client_info.write(header_buf, DBMS_TCP_PROTOCOL_VERSION); client_info.write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
} }
else else

View File

@ -102,7 +102,7 @@ namespace
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end()); ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream. // Propagate OpenTelemetry trace context, if any, downstream.
const auto &current_trace_context = OpenTelemetryThreadTraceContext::current(); const auto &current_trace_context = TracingContextOnThread::current();
if (current_trace_context.isTraceEnabled()) if (current_trace_context.isTraceEnabled())
{ {
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader()); headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());