This commit is contained in:
Alexander Kuzmenkov 2020-11-19 18:52:11 +03:00
parent f326536ef0
commit 6cb378e072
8 changed files with 94 additions and 76 deletions

View File

@ -155,72 +155,6 @@ void ClientInfo::setInitialQuery()
client_name = (DBMS_NAME " ") + client_name; client_name = (DBMS_NAME " ") + client_name;
} }
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &span_id, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
return true;
}
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id,
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(trace_flags));
}
void ClientInfo::fillOSUserHostNameAndVersionInfo() void ClientInfo::fillOSUserHostNameAndVersionInfo()
{ {

View File

@ -94,7 +94,7 @@ namespace ErrorCodes
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage) std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{ {
OpenTelemetrySpanHolder span(__FUNCTION__); OpenTelemetrySpanHolder span("InterpreterFactory::get()");
ProfileEvents::increment(ProfileEvents::Query); ProfileEvents::increment(ProfileEvents::Query);

View File

@ -29,6 +29,7 @@
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <Interpreters/JoinSwitcher.h> #include <Interpreters/JoinSwitcher.h>
#include <Interpreters/JoinedTables.h> #include <Interpreters/JoinedTables.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryAliasesVisitor.h> #include <Interpreters/QueryAliasesVisitor.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
@ -497,6 +498,8 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl() Block InterpreterSelectQuery::getSampleBlockImpl()
{ {
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
query_info.query = query_ptr; query_info.query = query_ptr;
if (storage && !options.only_analyze) if (storage && !options.only_analyze)

View File

@ -28,6 +28,7 @@ Block OpenTelemetrySpanLogElement::createBlock()
}; };
} }
void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{ {
size_t i = 0; size_t i = 0;
@ -52,6 +53,7 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(string_values); columns[i++]->insert(string_values);
} }
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name) OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
{ {
trace_id = 0; trace_id = 0;
@ -78,13 +80,15 @@ OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_
start_time_us = std::chrono::duration_cast<std::chrono::microseconds>( start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count(); std::chrono::system_clock::now().time_since_epoch()).count();
// ****** remove this #ifndef NDEBUG
attribute_names.push_back("clickhouse.start.stacktrace"); attribute_names.push_back("clickhouse.start.stacktrace");
attribute_values.push_back(StackTrace().toString()); attribute_values.push_back(StackTrace().toString());
#endif
thread.thread_trace_context.span_id = span_id; thread.thread_trace_context.span_id = span_id;
} }
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder() OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
{ {
try try
@ -116,11 +120,10 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
return; return;
} }
//******** remove this #ifndef NDEBUG
attribute_names.push_back("clickhouse.query_id");
attribute_values.push_back(context->getCurrentQueryId());
attribute_names.push_back("clickhouse.end.stacktrace"); attribute_names.push_back("clickhouse.end.stacktrace");
attribute_values.push_back(StackTrace().toString()); attribute_values.push_back(StackTrace().toString());
#endif
auto log = context->getOpenTelemetrySpanLog(); auto log = context->getOpenTelemetrySpanLog();
if (!log) if (!log)
@ -146,5 +149,74 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
} }
} }
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &span_id, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
return true;
}
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id,
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(trace_flags));
}
} }

View File

@ -341,8 +341,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
OpenTelemetrySpanLogElement span; OpenTelemetrySpanLogElement span;
span.trace_id = thread_trace_context.trace_id; span.trace_id = thread_trace_context.trace_id;
// Might be problematic if some span holder isn't finished by the time // All child span holders should be finished by the time we detach this
// we detach this thread... // thread, so the current span id should be the thread span id. If not,
// an assertion for a proper parent span in ~OpenTelemetrySpanHolder()
// is going to fail, because we're going to reset it to zero later in
// this function.
span.span_id = thread_trace_context.span_id; span.span_id = thread_trace_context.span_id;
span.parent_span_id = query_context->query_trace_context.span_id; span.parent_span_id = query_context->query_trace_context.span_id;
span.operation_name = getThreadName(); span.operation_name = getThreadName();
@ -355,6 +358,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
span.attribute_names.push_back("clickhouse.thread_id"); span.attribute_names.push_back("clickhouse.thread_id");
span.attribute_values.push_back(thread_id); span.attribute_values.push_back(thread_id);
#ifndef NDEBUG
span.attribute_names.push_back("clickhouse.end.stacktrace");
span.attribute_values.push_back(StackTrace().toString());
#endif
opentelemetry_span_log->add(span); opentelemetry_span_log->add(span);
} }

View File

@ -478,7 +478,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
} }
{ {
OpenTelemetrySpanHolder span("execute interpreter"); OpenTelemetrySpanHolder span("IInterpreter::execute()");
res = interpreter->execute(); res = interpreter->execute();
} }

View File

@ -76,7 +76,6 @@ static void executeJob(IProcessor * processor)
{ {
try try
{ {
OpenTelemetrySpanHolder span(demangle(typeid(*processor).name()));
processor->work(); processor->work();
} }
catch (Exception & exception) catch (Exception & exception)
@ -694,6 +693,8 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
void PipelineExecutor::executeImpl(size_t num_threads) void PipelineExecutor::executeImpl(size_t num_threads)
{ {
OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");
initializeExecution(num_threads); initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>; using ThreadsData = std::vector<ThreadFromGlobalPool>;

View File

@ -518,7 +518,7 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
void TCPHandler::processOrdinaryQuery() void TCPHandler::processOrdinaryQuery()
{ {
OpenTelemetrySpanHolder span(__FUNCTION__); OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
/// Pull query execution result, if exists, and send it to network. /// Pull query execution result, if exists, and send it to network.
if (state.io.in) if (state.io.in)