This commit is contained in:
Alexander Kuzmenkov 2020-08-27 21:44:20 +03:00
parent 05ad9b9fff
commit 54b3047d19
5 changed files with 63 additions and 10 deletions

View File

@ -2,6 +2,7 @@
#include <Poco/Net/SocketAddress.h> #include <Poco/Net/SocketAddress.h>
#include <Core/Types.h> #include <Core/Types.h>
#include <Common/UInt128.h>
namespace DB namespace DB
@ -59,7 +60,7 @@ public:
Poco::Net::SocketAddress initial_address; Poco::Net::SocketAddress initial_address;
UInt128 trace_id; __uint128_t trace_id;
UInt64 span_id; UInt64 span_id;
UInt64 parent_span_id; UInt64 parent_span_id;

View File

@ -1087,14 +1087,15 @@ void Context::setCurrentQueryId(const String & query_id)
UInt64 a; UInt64 a;
UInt64 b; UInt64 b;
} words; } words;
UInt128 uuid; __uint128_t uuid;
} random; } random;
random.words.a = thread_local_rng(); //-V656 random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656 random.words.b = thread_local_rng(); //-V656
trace_id = random.uuid; client_info.trace_id = random.uuid;
client_info.span_id = 1;
client_info.parent_span_id = 0;
String query_id_to_set = query_id; String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves. if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.

View File

@ -13,12 +13,15 @@ namespace DB
Block OpenTelemetrySpanLogElement::createBlock() Block OpenTelemetrySpanLogElement::createBlock()
{ {
return { return {
// event_date is the date part of event_time, used for indexing.
{std::make_shared<DataTypeDate>(), "event_date"},
// event_time is the span start time, named so to be compatible with
// the standard ClickHouse system log column names.
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUUID>(), "trace_id"}, {std::make_shared<DataTypeUUID>(), "trace_id"},
{std::make_shared<DataTypeUInt64>(), "span_id"}, {std::make_shared<DataTypeUInt64>(), "span_id"},
{std::make_shared<DataTypeUInt64>(), "parent_span_id"}, {std::make_shared<DataTypeUInt64>(), "parent_span_id"},
{std::make_shared<DataTypeString>(), "operation_name"}, {std::make_shared<DataTypeString>(), "operation_name"},
{std::make_shared<DataTypeDate>(), "start_date"},
{std::make_shared<DataTypeDateTime>(), "start_time"},
{std::make_shared<DataTypeDateTime>(), "finish_time"}, {std::make_shared<DataTypeDateTime>(), "finish_time"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), {std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.names"}, "attribute.names"},
@ -31,12 +34,12 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{ {
size_t i = 0; size_t i = 0;
columns[i++]->insert(trace_id); columns[i++]->insert(DateLUT::instance().toDayNum(start_time));
columns[i++]->insert(start_time);
columns[i++]->insert(UInt128(Int128(trace_id)));
columns[i++]->insert(span_id); columns[i++]->insert(span_id);
columns[i++]->insert(parent_span_id); columns[i++]->insert(parent_span_id);
columns[i++]->insert(operation_name); columns[i++]->insert(operation_name);
columns[i++]->insert(DateLUT::instance().toDayNum(start_time));
columns[i++]->insert(start_time);
columns[i++]->insert(finish_time); columns[i++]->insert(finish_time);
columns[i++]->insert(attribute_names); columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values); columns[i++]->insert(attribute_values);

View File

@ -19,7 +19,7 @@ struct OpenTelemetrySpanContext
// must log. // must log.
struct OpenTelemetrySpan struct OpenTelemetrySpan
{ {
UInt128 trace_id; __uint128_t trace_id;
UInt64 span_id; UInt64 span_id;
UInt64 parent_span_id; UInt64 parent_span_id;
std::string operation_name; std::string operation_name;

View File

@ -30,6 +30,7 @@
#include <Access/EnabledQuota.h> #include <Access/EnabledQuota.h>
#include <Interpreters/InterpreterFactory.h> #include <Interpreters/InterpreterFactory.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetryLog.h>
#include <Interpreters/QueryLog.h> #include <Interpreters/QueryLog.h>
#include <Interpreters/InterpreterSetQuery.h> #include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h> #include <Interpreters/ReplaceQueryParameterVisitor.h>
@ -146,6 +147,11 @@ static void logQuery(const String & query, const Context & context, bool interna
(current_user != "default" ? ", user: " + context.getClientInfo().current_user : ""), (current_user != "default" ? ", user: " + context.getClientInfo().current_user : ""),
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()), (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query)); joinLines(query));
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {:x}, parent span id {:x}",
context.getClientInfo().trace_id, context.getClientInfo().span_id,
context.getClientInfo().parent_span_id);
} }
} }
@ -216,6 +222,29 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
if (auto query_log = context.getQueryLog()) if (auto query_log = context.getQueryLog())
query_log->add(elem); query_log->add(elem);
if (auto opentelemetry_log = context.getOpenTelemetryLog())
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().trace_id;
span.span_id = context.getClientInfo().span_id;
span.parent_span_id = context.getClientInfo().parent_span_id;
span.operation_name = "query";
span.start_time = current_time;
span.finish_time = current_time;
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("status");
span.attribute_values.push_back("ExceptionBeforeStart");
span.attribute_names.push_back("query");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
opentelemetry_log->add(span);
}
ProfileEvents::increment(ProfileEvents::FailedQuery); ProfileEvents::increment(ProfileEvents::FailedQuery);
if (ast) if (ast)
@ -587,6 +616,25 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto opentelemetry_log = context.getOpenTelemetryLog()) if (auto opentelemetry_log = context.getOpenTelemetryLog())
{ {
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().trace_id;
span.span_id = context.getClientInfo().span_id;
span.parent_span_id = context.getClientInfo().parent_span_id;
span.operation_name = "query";
span.start_time = elem.query_start_time;
span.finish_time = time(nullptr); // current time
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("status");
span.attribute_values.push_back("QueryFinish");
span.attribute_names.push_back("query");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
opentelemetry_log->add(span);
} }
}; };