Merge pull request #16535 from ClickHouse/aku/opentelemetry

OpenTelemetry improvements
This commit is contained in:
Alexander Kuzmenkov 2020-11-25 14:10:17 +03:00 committed by GitHub
commit edce1e636e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 495 additions and 199 deletions

View File

@ -44,11 +44,10 @@ stages, such as query planning or distributed queries.
To be useful, the tracing information has to be exported to a monitoring system
that supports OpenTelemetry, such as Jaeger or Prometheus. ClickHouse avoids
a dependency on a particular monitoring system, instead only
providing the tracing data conforming to the standard. A natural way to do so
in an SQL RDBMS is a system table. OpenTelemetry trace span information
a dependency on a particular monitoring system, instead only providing the
tracing data through a system table. OpenTelemetry trace span information
[required by the standard](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/overview.md#span)
is stored in the system table called `system.opentelemetry_span_log`.
is stored in the `system.opentelemetry_span_log` table.
The table must be enabled in the server configuration, see the `opentelemetry_span_log`
element in the default config file `config.xml`. It is enabled by default.
@ -67,3 +66,31 @@ The table has the following columns:
The tags or attributes are saved as two parallel arrays, containing the keys
and values. Use `ARRAY JOIN` to work with them.
## Integration with monitoring systems
At the moment, there is no ready tool that can export the tracing data from
ClickHouse to a monitoring system.
For testing, it is possible to setup the export using a materialized view with the URL engine over the `system.opentelemetry_span_log` table, which would push the arriving log data to an HTTP endpoint of a trace collector. For example, to push the minimal span data to a Zipkin instance running at `http://localhost:9411`, in Zipkin v2 JSON format:
```sql
CREATE MATERIALIZED VIEW default.zipkin_spans
ENGINE = URL('http://127.0.0.1:9411/api/v2/spans', 'JSONEachRow')
SETTINGS output_format_json_named_tuples_as_objects = 1,
output_format_json_array_of_rows = 1 AS
SELECT
lower(hex(reinterpretAsFixedString(trace_id))) AS traceId,
lower(hex(parent_span_id)) AS parentId,
lower(hex(span_id)) AS id,
operation_name AS name,
start_time_us AS timestamp,
finish_time_us - start_time_us AS duration,
cast(tuple('clickhouse'), 'Tuple(serviceName text)') AS localEndpoint,
cast(tuple(
attribute.values[indexOf(attribute.names, 'db.statement')]),
'Tuple("db.statement" text)') AS tags
FROM system.opentelemetry_span_log
```
In case of any errors, the part of the log data for which the error has occurred will be silently lost. Check the server log for error messages if the data does not arrive.

View File

@ -2515,7 +2515,7 @@ public:
{
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
traceparent, error))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -2526,7 +2526,7 @@ public:
if (options.count("opentelemetry-tracestate"))
{
context.getClientInfo().opentelemetry_tracestate =
context.getClientInfo().client_trace_context.tracestate =
options["opentelemetry-tracestate"].as<std::string>();
}

View File

@ -0,0 +1,21 @@
#pragma once
namespace DB
{
// The runtime info we need to create new OpenTelemetry spans.
struct OpenTelemetryTraceContext
{
__uint128_t trace_id = 0;
UInt64 span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them
// downstream. See https://www.w3.org/TR/trace-context/
String tracestate;
__uint8_t trace_flags = 0;
// Parse/compose OpenTelemetry traceparent header.
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
};
}

View File

@ -2,6 +2,7 @@
#include <Common/ThreadProfileEvents.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Poco/Logger.h>
#include <common/getThreadId.h>

View File

@ -3,6 +3,7 @@
#include <common/StringRef.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Core/SettingsEnums.h>
@ -31,6 +32,7 @@ class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryThreadLog;
struct OpenTelemetrySpanHolder;
class TasksStatsCounters;
struct RUsageCounters;
struct PerfEventsCounters;
@ -86,9 +88,6 @@ extern thread_local ThreadStatus * current_thread;
class ThreadStatus : public boost::noncopyable
{
public:
ThreadStatus();
~ThreadStatus();
/// Linux's PID (or TGID) (the same id is shown by ps util)
const UInt64 thread_id = 0;
/// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached.
@ -110,6 +109,52 @@ public:
using Deleter = std::function<void()>;
Deleter deleter;
// This is the current most-derived OpenTelemetry span for this thread. It
// can be changed throughout the query execution, whenever we enter a new
// span or exit it. See OpenTelemetrySpanHolder that is normally responsible
// for these changes.
OpenTelemetryTraceContext thread_trace_context;
protected:
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
/// Is set once
Context * global_context = nullptr;
/// Use it only from current thread
Context * query_context = nullptr;
String query_id;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;
bool performance_counters_finalized = false;
UInt64 query_start_time_nanoseconds = 0;
UInt64 query_start_time_microseconds = 0;
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> taskstats;
/// Is used to send logs from logs_queue to client in case of fatal errors.
std::function<void()> fatal_error_callback;
public:
ThreadStatus();
~ThreadStatus();
ThreadGroupStatusPtr getThreadGroup() const
{
return thread_group;
@ -176,40 +221,6 @@ protected:
void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr) const;
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
/// Is set once
Context * global_context = nullptr;
/// Use it only from current thread
Context * query_context = nullptr;
String query_id;
/// A logs queue used by TCPHandler to pass logs to a client
InternalTextLogsQueueWeakPtr logs_queue_ptr;
bool performance_counters_finalized = false;
UInt64 query_start_time_nanoseconds = 0;
UInt64 query_start_time_microseconds = 0;
time_t query_start_time = 0;
size_t queries_started = 0;
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
Poco::Logger * log = nullptr;
friend class CurrentThread;
/// Use ptr not to add extra dependencies in the header
std::unique_ptr<RUsageCounters> last_rusage;
std::unique_ptr<TasksStatsCounters> taskstats;
/// Is used to send logs from logs_queue to client in case of fatal errors.
std::function<void()> fatal_error_callback;
private:
void setupState(const ThreadGroupStatusPtr & thread_group_);

View File

@ -27,6 +27,11 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
{
ClientInfo modified_client_info = client_info_;
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context
= CurrentThread::get().thread_trace_context;
}
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.

View File

@ -156,6 +156,10 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context.getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
}
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);

View File

@ -237,7 +237,10 @@ void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPR
{
auto status = response.getStatus();
if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects)))
if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
|| (isRedirect(status) && allow_redirects)))
{
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
error_message.exceptions(std::ios::failbit);

View File

@ -62,16 +62,16 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
if (opentelemetry_trace_id)
if (client_trace_context.trace_id)
{
// Have OpenTelemetry header.
writeBinary(uint8_t(1), out);
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
writeBinary(client_trace_context.trace_id, out);
writeBinary(client_trace_context.span_id, out);
writeBinary(client_trace_context.tracestate, out);
writeBinary(client_trace_context.trace_flags, out);
}
else
{
@ -139,10 +139,10 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(have_trace_id, in);
if (have_trace_id)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
readBinary(client_trace_context.trace_id, in);
readBinary(client_trace_context.span_id, in);
readBinary(client_trace_context.tracestate, in);
readBinary(client_trace_context.trace_flags, in);
}
}
}
@ -155,74 +155,6 @@ void ClientInfo::setInitialQuery()
client_name = (DBMS_NAME " ") + client_name;
}
bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
uint64_t trace_parent = 0;
uint8_t trace_flags = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &trace_parent, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
opentelemetry_trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
opentelemetry_span_id = trace_parent;
opentelemetry_trace_flags = trace_flags;
return true;
}
std::string ClientInfo::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id,
opentelemetry_span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(opentelemetry_trace_flags));
}
void ClientInfo::fillOSUserHostNameAndVersionInfo()
{

View File

@ -3,7 +3,7 @@
#include <Poco/Net/SocketAddress.h>
#include <Common/UInt128.h>
#include <common/types.h>
#include <Common/OpenTelemetryTraceContext.h>
namespace DB
{
@ -60,16 +60,9 @@ public:
String initial_query_id;
Poco::Net::SocketAddress initial_address;
// OpenTelemetry trace information.
__uint128_t opentelemetry_trace_id = 0;
// The span id we get the in the incoming client info becomes our parent span
// id, and the span id we send becomes downstream parent span id.
UInt64 opentelemetry_span_id = 0;
UInt64 opentelemetry_parent_span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them downstream.
// They are described at https://www.w3.org/TR/trace-context/
String opentelemetry_tracestate;
UInt8 opentelemetry_trace_flags = 0;
// OpenTelemetry trace context we received from client, or which we are going
// to send to server.
OpenTelemetryTraceContext client_trace_context;
/// All below are parameters related to initial query.
@ -103,16 +96,6 @@ public:
/// Initialize parameters on client initiating query.
void setInitialQuery();
// Parse/compose OpenTelemetry traceparent header.
// Note that these functions use span_id field, not parent_span_id, same as
// in native protocol. The incoming traceparent corresponds to the upstream
// trace span, and the outgoing traceparent corresponds to our current span.
// We use the same ClientInfo structure first for incoming span, and then
// for our span: when we switch, we use old span_id as parent_span_id, and
// generate a new span_id (currently this happens in Context::setQueryId()).
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
private:
void fillOSUserHostNameAndVersionInfo();
};

View File

@ -1127,8 +1127,14 @@ void Context::setCurrentQueryId(const String & query_id)
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& client_info.opentelemetry_trace_id == 0)
if (client_info.client_trace_context.trace_id != 0)
{
// Use the OpenTelemetry trace context we received from the client, and
// create a new span for the query.
query_trace_context = client_info.client_trace_context;
query_trace_context.span_id = thread_local_rng();
}
else if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
@ -1138,20 +1144,12 @@ void Context::setCurrentQueryId(const String & query_id)
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
query_trace_context.trace_id = random.uuid;
query_trace_context.span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
client_info.opentelemetry_trace_flags = 1;
query_trace_context.trace_flags = 1;
}
}
else
{
// The incoming request has an OpenTelemtry trace context. Its span id
// becomes our parent span id.
client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id;
client_info.opentelemetry_span_id = thread_local_rng();
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.

View File

@ -13,6 +13,7 @@
#include <Common/LRUCache.h>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Storages/IStorage_fwd.h>
#include <atomic>
#include <chrono>
@ -198,6 +199,12 @@ private:
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
// a query context.
OpenTelemetryTraceContext query_trace_context;
private:
friend class NamedSessions;
using SampleBlockCache = std::unordered_map<std::string, Block>;

View File

@ -66,6 +66,7 @@
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterExternalDDLQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ASTSystemQuery.h>
@ -93,6 +94,8 @@ namespace ErrorCodes
std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
OpenTelemetrySpanHolder span("InterpreterFactory::get()");
ProfileEvents::increment(ProfileEvents::Query);
if (query->as<ASTSelectQuery>())

View File

@ -29,6 +29,7 @@
#include <Interpreters/TableJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryAliasesVisitor.h>
#include <Processors/Pipe.h>
@ -494,6 +495,8 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
query_info.query = query_ptr;
if (storage && !options.only_analyze)

View File

@ -18,8 +18,18 @@ Block OpenTelemetrySpanLogElement::createBlock()
{std::make_shared<DataTypeUInt64>(), "span_id"},
{std::make_shared<DataTypeUInt64>(), "parent_span_id"},
{std::make_shared<DataTypeString>(), "operation_name"},
{std::make_shared<DataTypeDateTime64>(6), "start_time_us"},
{std::make_shared<DataTypeDateTime64>(6), "finish_time_us"},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
// 1) reinterpretAsUInt64(reinterpretAsFixedString(date)), which just
// doesn't look sane;
// 2) things like toUInt64(toDecimal64(date, 6) * 1000000) that are also
// excessively verbose -- why do I have to write scale '6' again, and
// write out 6 zeros? -- and also don't work because of overflow.
// Also subtraction of two DateTime64 points doesn't work, so you can't
// get duration.
// It is much less hassle to just use UInt64 of microseconds.
{std::make_shared<DataTypeUInt64>(), "start_time_us"},
{std::make_shared<DataTypeUInt64>(), "finish_time_us"},
{std::make_shared<DataTypeDate>(), "finish_date"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.names"},
@ -28,6 +38,7 @@ Block OpenTelemetrySpanLogElement::createBlock()
};
}
void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
@ -40,8 +51,177 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(finish_time_us);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time_us / 1000000));
columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values);
// The user might add some ints values, and we will have Int Field, and the
// insert will fail because the column requires Strings. Convert the fields
// here, because it's hard to remember to convert them in all other places.
Array string_values;
string_values.reserve(attribute_values.size());
for (const auto & value : attribute_values)
{
string_values.push_back(toString(value));
}
columns[i++]->insert(string_values);
}
OpenTelemetrySpanHolder::OpenTelemetrySpanHolder(const std::string & _operation_name)
{
trace_id = 0;
if (!CurrentThread::isInitialized())
{
// There may be no thread context if we're running inside the
// clickhouse-client, e.g. reading an external table provided with the
// `--external` option.
return;
}
auto & thread = CurrentThread::get();
trace_id = thread.thread_trace_context.trace_id;
if (!trace_id)
{
return;
}
parent_span_id = thread.thread_trace_context.span_id;
span_id = thread_local_rng();
operation_name = _operation_name;
start_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
#ifndef NDEBUG
attribute_names.push_back("clickhouse.start.stacktrace");
attribute_values.push_back(StackTrace().toString());
#endif
thread.thread_trace_context.span_id = span_id;
}
OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
{
try
{
if (!trace_id)
{
return;
}
// First of all, return old value of current span.
auto & thread = CurrentThread::get();
assert(thread.thread_trace_context.span_id = span_id);
thread.thread_trace_context.span_id = parent_span_id;
// Not sure what's the best way to access the log from here.
auto * thread_group = CurrentThread::getGroup().get();
// Not sure whether and when this can be null.
if (!thread_group)
{
return;
}
auto * context = thread_group->query_context;
if (!context)
{
// Both global and query contexts can be null when executing a
// background task, and global context can be null for some
// queries.
return;
}
#ifndef NDEBUG
attribute_names.push_back("clickhouse.end.stacktrace");
attribute_values.push_back(StackTrace().toString());
#endif
auto log = context->getOpenTelemetrySpanLog();
if (!log)
{
// The log might be disabled.
return;
}
finish_time_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(
static_cast<OpenTelemetrySpan>(*this)));
}
catch (...)
{
tryLogCurrentException(__FUNCTION__);
}
}
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
trace_id = 0;
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &span_id, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
return true;
}
std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", trace_id,
span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(trace_flags));
}
}

View File

@ -11,9 +11,8 @@ struct OpenTelemetrySpan
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
Decimal64 start_time_us;
Decimal64 finish_time_us;
UInt64 duration_ns;
UInt64 start_time_us;
UInt64 finish_time_us;
Array attribute_names;
Array attribute_values;
// I don't understand how Links work, namely, which direction should they
@ -23,6 +22,10 @@ struct OpenTelemetrySpan
struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan
{
OpenTelemetrySpanLogElement() = default;
OpenTelemetrySpanLogElement(const OpenTelemetrySpan & span)
: OpenTelemetrySpan(span) {}
static std::string name() { return "OpenTelemetrySpanLog"; }
static Block createBlock();
void appendToBlock(MutableColumns & columns) const;
@ -36,4 +39,10 @@ public:
using SystemLog<OpenTelemetrySpanLogElement>::SystemLog;
};
struct OpenTelemetrySpanHolder : public OpenTelemetrySpan
{
OpenTelemetrySpanHolder(const std::string & _operation_name);
~OpenTelemetrySpanHolder();
};
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
@ -73,6 +74,15 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
thread_group->global_context = global_context;
}
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
// FIXME why and how is this different from setupState()?
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
applyQuerySettings();
}
@ -108,8 +118,22 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
}
if (query_context)
{
applyQuerySettings();
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
@ -300,6 +324,46 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
if (thread_trace_context.trace_id && query_context)
{
opentelemetry_span_log = query_context->getOpenTelemetrySpanLog();
}
if (opentelemetry_span_log)
{
// Log the current thread span.
// We do this manually, because we can't use OpenTelemetrySpanHolder as a
// ThreadStatus member, because of linking issues. This file is linked
// separately, so we can reference OpenTelemetrySpanLog here, but if we had
// the span holder as a field, we would have to reference it in the
// destructor, which is in another library.
OpenTelemetrySpanLogElement span;
span.trace_id = thread_trace_context.trace_id;
// All child span holders should be finished by the time we detach this
// thread, so the current span id should be the thread span id. If not,
// an assertion for a proper parent span in ~OpenTelemetrySpanHolder()
// is going to fail, because we're going to reset it to zero later in
// this function.
span.span_id = thread_trace_context.span_id;
span.parent_span_id = query_context->query_trace_context.span_id;
span.operation_name = getThreadName();
span.start_time_us = query_start_time_microseconds;
span.finish_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
span.attribute_names.push_back("clickhouse.thread_id");
span.attribute_values.push_back(thread_id);
#ifndef NDEBUG
span.attribute_names.push_back("clickhouse.end.stacktrace");
span.attribute_values.push_back(StackTrace().toString());
#endif
opentelemetry_span_log->add(span);
}
finalizeQueryProfiler();
finalizePerformanceCounters();
@ -312,6 +376,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_id.clear();
query_context = nullptr;
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;

View File

@ -153,13 +153,11 @@ static void logQuery(const String & query, const Context & context, bool interna
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query));
if (client_info.opentelemetry_trace_id)
if (client_info.client_trace_context.trace_id)
{
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
client_info.opentelemetry_trace_id,
client_info.opentelemetry_span_id,
client_info.opentelemetry_parent_span_id);
"OpenTelemetry traceparent '{}'",
client_info.client_trace_context.composeTraceparentHeader());
}
}
}
@ -247,17 +245,16 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
query_log->add(elem);
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = current_time_us;
span.finish_time_us = current_time_us;
span.duration_ns = 0;
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
@ -269,11 +266,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);
@ -485,7 +482,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
res = interpreter->execute();
{
OpenTelemetrySpanHolder span("IInterpreter::execute()");
res = interpreter->execute();
}
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
@ -691,17 +692,16 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
context.query_trace_context.trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.trace_id = context.query_trace_context.trace_id;
span.span_id = context.query_trace_context.span_id;
span.parent_span_id = context.getClientInfo().client_trace_context.span_id;
span.operation_name = "query";
span.start_time_us = elem.query_start_time_microseconds;
span.finish_time_us = time_in_microseconds(finish_time);
span.duration_ns = elapsed_seconds * 1000000000;
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
@ -712,11 +712,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
if (!context.query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
context.query_trace_context.tracestate);
}
opentelemetry_span_log->add(span);

View File

@ -1,4 +1,6 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/Lexer.h>

View File

@ -8,6 +8,7 @@
#include <Processors/ISource.h>
#include <Common/setThreadName.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#ifndef NDEBUG
#include <Common/Stopwatch.h>
@ -692,6 +693,8 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
void PipelineExecutor::executeImpl(size_t num_threads)
{
OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");
initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>;

View File

@ -318,7 +318,7 @@ void HTTPHandler::processQuery(
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
@ -326,7 +326,7 @@ void HTTPHandler::processQuery(
opentelemetry_traceparent, error);
}
context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", "");
context.getClientInfo().client_trace_context.tracestate = request.get("tracestate", "");
}
#endif

View File

@ -20,6 +20,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Core/ExternalTable.h>
@ -517,6 +518,8 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
void TCPHandler::processOrdinaryQuery()
{
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{

View File

@ -74,16 +74,19 @@ namespace
ReadWriteBufferFromHTTP::HTTPHeaderEntries header;
// Propagate OpenTelemetry trace context, if any, downstream.
const auto & client_info = context.getClientInfo();
if (client_info.opentelemetry_trace_id)
if (CurrentThread::isInitialized())
{
header.emplace_back("traceparent",
client_info.composeTraceparentHeader());
if (!client_info.opentelemetry_tracestate.empty())
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id)
{
header.emplace_back("tracestate",
client_info.opentelemetry_tracestate);
header.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
header.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
}

View File

@ -33,6 +33,7 @@ ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/database_atomic_drop_detach_sync.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/opentelemetry.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/ints_dictionary.xml $DEST_SERVER_PATH/
ln -sf $SRC_PATH/strings_dictionary.xml $DEST_SERVER_PATH/

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<opentelemetry_start_trace_probability>0.1</opentelemetry_start_trace_probability>
</default>
</profiles>
</yandex>

View File

@ -12,24 +12,48 @@ $CLICKHOUSE_CLIENT --query "CREATE USER user IDENTIFIED WITH PLAINTEXT_PASSWORD
# False positive result due to race condition with sleeps is Ok.
$CLICKHOUSE_CLIENT --user user --password hello --query "SELECT sleep(1)" &
bg_query=$!
# Wait for query to start executing. At that time, the password should be cleared.
while true; do
sleep 0.1
$CLICKHOUSE_CLIENT --query "SHOW PROCESSLIST" | grep -q 'SELECT sleep(1)' && break;
for _ in {1..20}
do
if $CLICKHOUSE_CLIENT --query "SHOW PROCESSLIST" | grep -q 'SELECT sleep(1)'
then
break
fi
if ! kill -0 -- $bg_query
then
>&2 echo "The SELECT sleep(1) query finished earlier that we could grep for it in the process list, but it should have run for at least one second. Looks like a bug"
fi
done
ps auxw | grep -F -- '--password' | grep -F hello ||:
# Check that it is still running
kill -0 -- $bg_query
wait
# Once again with different syntax
$CLICKHOUSE_CLIENT --user user --password=hello --query "SELECT sleep(1)" &
bg_query=$!
while true; do
sleep 0.1
$CLICKHOUSE_CLIENT --query "SHOW PROCESSLIST" | grep -q 'SELECT sleep(1)' && break;
# Wait for query to start executing. At that time, the password should be cleared.
for _ in {1..20}
do
if $CLICKHOUSE_CLIENT --query "SHOW PROCESSLIST" | grep -q 'SELECT sleep(1)'
then
break
fi
if ! kill -0 -- $bg_query
then
>&2 echo "The SELECT sleep(1) query finished earlier that we could grep for it in the process list, but it should have run for at least one second. Looks like a bug"
fi
done
ps auxw | grep -F -- '--password' | grep -F hello ||:
# Check that it is still running
kill -0 -- $bg_query
wait
$CLICKHOUSE_CLIENT --query "DROP USER user"

View File

@ -1,5 +1,5 @@
===http===
{"total spans":"4","unique spans":"4","unique non-zero parent spans":"2"}
{"total spans":"4","unique spans":"4","unique non-zero parent spans":"3"}
{"initial query spans with proper parent":"1"}
{"unique non-empty tracestate values":"1"}
===native===

View File

@ -60,7 +60,7 @@ trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString
# https://github.com/ClickHouse/ClickHouse/issues/14228
${CLICKHOUSE_CURL} \
--header "traceparent: 00-$trace_id-0000000000000073-01" \
--header "tracestate: some custom state" "http://localhost:8123/" \
--header "tracestate: some custom state" "http://127.0.0.2:8123/" \
--get \
--data-urlencode "query=select 1 from remote('127.0.0.2', system, one) format Null"