From 46127946788d0ae89aa8c895b8d31fbb453e04c1 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Fri, 28 Aug 2020 04:21:08 +0300 Subject: [PATCH] opentelemetry context propagation --- src/Core/Defines.h | 5 +- src/Interpreters/ClientInfo.cpp | 140 ++++++++++++++++++++++++++++++ src/Interpreters/ClientInfo.h | 17 +++- src/Interpreters/Context.cpp | 20 ++++- src/Interpreters/executeQuery.cpp | 33 +++++-- src/Interpreters/ya.make | 1 + src/Server/HTTPHandler.cpp | 23 ++++- src/Server/TCPHandler.cpp | 8 +- src/Storages/StorageURL.cpp | 18 +++- 9 files changed, 244 insertions(+), 21 deletions(-) diff --git a/src/Core/Defines.h b/src/Core/Defines.h index e244581c339..d19513d1434 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -67,8 +67,11 @@ /// Minimum revision supporting SettingsBinaryFormat::STRINGS. #define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429 +/// Minimum revision supporting OpenTelemetry +#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54227 + /// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change. -#define DBMS_TCP_PROTOCOL_VERSION 54226 +#define DBMS_TCP_PROTOCOL_VERSION 54227 /// The boundary on which the blocks for asynchronous file operations should be aligned. #define DEFAULT_AIO_FILE_BLOCK_SIZE 4096 diff --git a/src/Interpreters/ClientInfo.cpp b/src/Interpreters/ClientInfo.cpp index 378375dcc18..9e501ca5c11 100644 --- a/src/Interpreters/ClientInfo.cpp +++ b/src/Interpreters/ClientInfo.cpp @@ -60,6 +60,18 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) writeVarUInt(client_version_patch, out); } + + if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY) + { + // No point writing these numbers with variable length, because they + // are random and will probably require the full length anyway. + writeBinary(opentelemetry_trace_id, out); + writeBinary(opentelemetry_span_id, out); + writeBinary(opentelemetry_parent_span_id, out); + writeBinary(opentelemetry_tracestate, out); + writeBinary(opentelemetry_trace_flags, out); + std::cerr << fmt::format("wrote {:x}, {}, {}\n", opentelemetry_trace_id, opentelemetry_span_id, opentelemetry_parent_span_id) << StackTrace().toString() << std::endl; + } } @@ -113,6 +125,17 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) else client_version_patch = client_revision; } + + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY) + { + readBinary(opentelemetry_trace_id, in); + readBinary(opentelemetry_span_id, in); + readBinary(opentelemetry_parent_span_id, in); + readBinary(opentelemetry_tracestate, in); + readBinary(opentelemetry_trace_flags, in); + + std::cerr << fmt::format("read {:x}, {}, {}\n", opentelemetry_trace_id, opentelemetry_span_id, opentelemetry_parent_span_id) << StackTrace().toString() << std::endl; + } } @@ -123,6 +146,123 @@ void ClientInfo::setInitialQuery() client_name = (DBMS_NAME " ") + client_name; } +template +bool readLowercaseHexDigits(const char *& begin, const char * end, T & dest_value, std::string & error) +{ + char * dest_begin = reinterpret_cast(&dest_value); + char * dest_end = dest_begin + sizeof(dest_value); + bool odd_character = true; + for (;;) + { + if (begin == end) + { + if (dest_begin == dest_end) + { + return true; + } + error = fmt::format("Not enough charaters in the input, got {}, need {} more", end - begin, dest_end - dest_begin); + return false; + } + + if (dest_begin == dest_end) + { + return true; + } + + int cur = 0; + if (*begin >= '0' && *begin <= '9') + { + cur = *begin - '0'; + } + else if (*begin >= 'a' && *begin <= 'f') + { + cur = 10 + *begin - 'a'; + } + else + { + error = fmt::format("Encountered '{}' which is not a lowercase hexadecimal digit", *begin); + return false; + } + + // Two characters per byte, little-endian. + if (odd_character) + { + *(dest_end - 1) = cur; + } + else + { + *(dest_end - 1) = *(dest_end - 1) << 8 | cur; + --dest_end; + } + + begin++; + odd_character = !odd_character; + } +} + +bool ClientInfo::setOpenTelemetryTraceparent(const std::string & traceparent, + std::string & error) +{ + uint8_t version = -1; + __uint128_t trace_id = 0; + uint64_t trace_parent = 0; + uint8_t trace_flags = 0; + + const char * begin = &traceparent[0]; + const char * end = begin + traceparent.length(); + +#define CHECK_CONDITION(condition, ...) \ + ((condition) || (error = fmt::format(__VA_ARGS__), false)) + +#define CHECK_DELIMITER \ + (begin >= end \ + ? (error = fmt::format( \ + "Expected '-' delimiter, got EOL at position {}", \ + begin - &traceparent[0]), \ + false) \ + : *begin != '-' \ + ? (error = fmt::format( \ + "Expected '-' delimiter, got '{}' at position {}", \ + *begin, begin - &traceparent[0]), \ + false) \ + : (++begin, true)) + + bool result = readLowercaseHexDigits(begin, end, version, error) + && CHECK_CONDITION(version == 0, "Expected version 00, got {}", version) + && CHECK_DELIMITER + && readLowercaseHexDigits(begin, end, trace_id, error) + && CHECK_DELIMITER + && readLowercaseHexDigits(begin, end, trace_parent, error) + && CHECK_DELIMITER + && readLowercaseHexDigits(begin, end, trace_flags, error) + && CHECK_CONDITION(begin == end, + "Expected end of string, got {} at position {}", *begin, end - begin); + +#undef CHECK +#undef CHECK_DELIMITER + + if (!result) + { + return false; + } + + opentelemetry_trace_id = trace_id; + opentelemetry_parent_span_id = trace_parent; + opentelemetry_trace_flags = trace_flags; + return true; +} + + +std::string ClientInfo::getOpenTelemetryTraceparentForChild() const +{ + // This span is a parent for its children (so deep...), so we specify + // this span_id as a parent id. + return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id, + opentelemetry_span_id, + // This cast is because fmt is being weird and complaining that + // "mixing character types is not allowed". + static_cast(opentelemetry_trace_flags)); +} void ClientInfo::fillOSUserHostNameAndVersionInfo() { diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 52391d6cf73..413e1c42bf7 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -59,9 +59,17 @@ public: String initial_query_id; Poco::Net::SocketAddress initial_address; - __uint128_t trace_id; - UInt64 span_id; - UInt64 parent_span_id; + // OpenTelemetry things + __uint128_t opentelemetry_trace_id = 0; + // Span ID is not strictly the client info, but convenient to keep here. + // The span id we get the in the incoming client info becomes our parent span + // id, and the span id we send becomes downstream parent span id. + UInt64 opentelemetry_span_id = 0; + UInt64 opentelemetry_parent_span_id = 0; + // the incoming tracestate header, we just pass it downstream. + // https://www.w3.org/TR/trace-context/ + String opentelemetry_tracestate; + UInt8 opentelemetry_trace_flags; /// All below are parameters related to initial query. @@ -95,6 +103,9 @@ public: /// Initialize parameters on client initiating query. void setInitialQuery(); + bool setOpenTelemetryTraceparent(const std::string & traceparent, std::string & error); + std::string getOpenTelemetryTraceparentForChild() const; + private: void fillOSUserHostNameAndVersionInfo(); }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index c225c332248..c5fd2d585e1 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1093,9 +1093,23 @@ void Context::setCurrentQueryId(const String & query_id) random.words.a = thread_local_rng(); //-V656 random.words.b = thread_local_rng(); //-V656 - client_info.trace_id = random.uuid; - client_info.span_id = 1; - client_info.parent_span_id = 0; + fmt::print(stderr, "traceid {}, ==0 {}\n", client_info.opentelemetry_trace_id, client_info.opentelemetry_trace_id == 0); + if (client_info.opentelemetry_trace_id == 0) + { + // If trace_id is not initialized, it means that this is an initial query + // without any parent OpenTelemetry trace. Use the randomly generated + // default query id as the new trace id. + client_info.opentelemetry_trace_id = random.uuid; + client_info.opentelemetry_parent_span_id = 0; + client_info.opentelemetry_span_id = thread_local_rng(); + } + else + { + // The incoming span id becomes our parent span id. + client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id; + client_info.opentelemetry_span_id = thread_local_rng(); + } + fmt::print(stderr, "traceid {}, ==0 {}\n{}\n", client_info.opentelemetry_trace_id, client_info.opentelemetry_trace_id == 0, StackTrace().toString()); String query_id_to_set = query_id; if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves. diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index e74b24c4aa7..2a35bc205fa 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -149,9 +149,11 @@ static void logQuery(const String & query, const Context & context, bool interna joinLines(query)); LOG_TRACE(&Poco::Logger::get("executeQuery"), - "OpenTelemetry trace id {:x}, span id {:x}, parent span id {:x}", - context.getClientInfo().trace_id, context.getClientInfo().span_id, - context.getClientInfo().parent_span_id); + "OpenTelemetry trace id {:x}, span id {}, parent span id {}", + context.getClientInfo().opentelemetry_trace_id, context.getClientInfo().opentelemetry_span_id, + context.getClientInfo().opentelemetry_parent_span_id); + + std::cerr << StackTrace().toString() << std::endl; } } @@ -225,9 +227,9 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c if (auto opentelemetry_log = context.getOpenTelemetryLog()) { OpenTelemetrySpanLogElement span; - span.trace_id = context.getClientInfo().trace_id; - span.span_id = context.getClientInfo().span_id; - span.parent_span_id = context.getClientInfo().parent_span_id; + span.trace_id = context.getClientInfo().opentelemetry_trace_id; + span.span_id = context.getClientInfo().opentelemetry_span_id; + span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id; span.operation_name = "query"; span.start_time = current_time; span.finish_time = current_time; @@ -242,6 +244,13 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c span.attribute_names.push_back("query_id"); span.attribute_values.push_back(elem.client_info.current_query_id); + if (!context.getClientInfo().opentelemetry_tracestate.empty()) + { + span.attribute_names.push_back("tracestate"); + span.attribute_values.push_back( + context.getClientInfo().opentelemetry_tracestate); + } + opentelemetry_log->add(span); } @@ -617,9 +626,9 @@ static std::tuple executeQueryImpl( if (auto opentelemetry_log = context.getOpenTelemetryLog()) { OpenTelemetrySpanLogElement span; - span.trace_id = context.getClientInfo().trace_id; - span.span_id = context.getClientInfo().span_id; - span.parent_span_id = context.getClientInfo().parent_span_id; + span.trace_id = context.getClientInfo().opentelemetry_trace_id; + span.span_id = context.getClientInfo().opentelemetry_span_id; + span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id; span.operation_name = "query"; span.start_time = elem.query_start_time; span.finish_time = time(nullptr); // current time @@ -633,6 +642,12 @@ static std::tuple executeQueryImpl( span.attribute_names.push_back("query_id"); span.attribute_values.push_back(elem.client_info.current_query_id); + if (!context.getClientInfo().opentelemetry_tracestate.empty()) + { + span.attribute_names.push_back("tracestate"); + span.attribute_values.push_back( + context.getClientInfo().opentelemetry_tracestate); + } opentelemetry_log->add(span); } diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make index 23cde61a744..8c4086722c8 100644 --- a/src/Interpreters/ya.make +++ b/src/Interpreters/ya.make @@ -113,6 +113,7 @@ SRCS( MutationsInterpreter.cpp MySQL/InterpretersMySQLDDLQuery.cpp NullableUtils.cpp + OpenTelemetryLog.cpp OptimizeIfChains.cpp OptimizeIfWithConstantConditionVisitor.cpp PartLog.cpp diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index 95f56b715b8..fb630010198 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -96,6 +96,7 @@ namespace ErrorCodes extern const int WRONG_PASSWORD; extern const int REQUIRED_PASSWORD; + extern const int BAD_REQUEST_PARAMETER; extern const int INVALID_SESSION_TIMEOUT; extern const int HTTP_LENGTH_REQUIRED; } @@ -279,9 +280,7 @@ void HTTPHandler::processQuery( } } - std::string query_id = params.get("query_id", ""); context.setUser(user, password, request.clientAddress()); - context.setCurrentQueryId(query_id); if (!quota_key.empty()) context.setQuotaKey(quota_key); @@ -311,6 +310,26 @@ void HTTPHandler::processQuery( session->release(); }); + std::string query_id = params.get("query_id", ""); + context.setCurrentQueryId(query_id); + + if (request.has("traceparent")) + { + std::string opentelemetry_traceparent = request.get("traceparent"); + std::string error; + if (!context.getClientInfo().setOpenTelemetryTraceparent( + opentelemetry_traceparent, error)) + { + throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, + "Failed to parse OpenTelemetry traceparent header '{}': {}", + opentelemetry_traceparent, error); + } + + context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", ""); + + + } + /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). String http_response_compression_methods = request.get("Accept-Encoding", ""); CompressionMethod http_response_compression_method = CompressionMethod::None; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index ab4ce820666..e83bbb02cad 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -845,13 +845,17 @@ void TCPHandler::receiveQuery() state.is_empty = false; readStringBinary(state.query_id, *in); - query_context->setCurrentQueryId(state.query_id); - /// Client info ClientInfo & client_info = query_context->getClientInfo(); if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) client_info.read(*in, client_revision); + // It is convenient to generate default OpenTelemetry trace id and default + // query id together. ClientInfo might contain upstream trace id, so we + // decide whether to use the default ids after we have received the ClientInfo. + // We also set up the parent span id while we're at it. + query_context->setCurrentQueryId(state.query_id); + /// For better support of old clients, that does not send ClientInfo. if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY) { diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index c2f7bfd18d2..d5e86c08a8b 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -67,6 +67,22 @@ namespace const CompressionMethod compression_method) : SourceWithProgress(sample_block), name(std::move(name_)) { + ReadWriteBufferFromHTTP::HTTPHeaderEntries header; + + // Propagate OpenTelemetry trace context, if any, downstream. + auto & client_info = context.getClientInfo(); + if (client_info.opentelemetry_trace_id) + { + header.emplace_back("traceparent", + client_info.getOpenTelemetryTraceparentForChild()); + + if (!client_info.opentelemetry_tracestate.empty()) + { + header.emplace_back("tracestate", + client_info.opentelemetry_tracestate); + } + } + read_buf = wrapReadBufferWithCompressionMethod( std::make_unique( uri, @@ -76,7 +92,7 @@ namespace context.getSettingsRef().max_http_get_redirects, Poco::Net::HTTPBasicCredentials{}, DBMS_DEFAULT_BUFFER_SIZE, - ReadWriteBufferFromHTTP::HTTPHeaderEntries{}, + header, context.getRemoteHostFilter()), compression_method);