fixes and some docs

This commit is contained in:
Alexander Kuzmenkov 2020-09-08 16:19:27 +03:00
parent fa8eebed78
commit d0a9926e7d
12 changed files with 226 additions and 69 deletions

View File

@ -0,0 +1,61 @@
# [draft] OpenTelemetry support
[OpenTelemetry](https://opentelemetry.io/) is an open standard for collecting
traces and metrics from distributed application. ClickHouse has some support
for OpenTelemetry.
## Supplying Trace Context to ClickHouse
ClickHouse accepts trace context HTTP headers, as described by
the [W3C recommendation](https://www.w3.org/TR/trace-context/).
It also accepts trace context over native protocol that is used for
communication between ClickHouse servers or between the client and server.
For manual testing, trace context headers conforming to the Trace Context
recommendation can be supplied to `clickhouse-client` using
`--opentelemetry-traceparent` and `--opentelemetry-tracestate` flags.
If no parent trace context is supplied, ClickHouse can start a new trace, with
probability controlled by the `opentelemetry_start_trace_probability` setting.
## Propagating the Trace Context
The trace context is propagated to downstream services in the following cases:
* Queries to remote ClickHouse servers, such as when using `Distributed` table
engine.
* `URL` table function. Trace context information is sent in HTTP headers.
## Tracing the ClickHouse Itself
ClickHouse creates _trace spans_ for each query and some of the query execution
stages, such as query planning or distributed queries.
To be useful, the tracing information has to be exported to a monitoring system
that supports OpenTelemetry, such as Jaeger or Prometheus. ClickHouse avoids
a dependency on a particular monitoring system, instead only
providing the tracing data conforming to the standard. A natural way to do so
in an SQL RDBMS is a system table. OpenTelemetry trace span information
[required by the standard](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/overview.md#span)
is stored in the system table called `system.opentelemetry_log`.
The table must be enabled in the server configuration, see the `opentelemetry_log`
element in the default config file `config.xml`. It is enabled by default.
The table has the following columns:
- `trace_id`
- `span_id`
- `parent_span_id`
- `operation_name`
- `start_time`
- `finish_time`
- `finish_date`
- `attribute.name`
- `attribute.values`
The tags or attributes are saved as two parallel arrays, containing the keys
and values. Use `ARRAY JOIN` to work with them.

View File

@ -567,6 +567,21 @@
OpenTelemetry log contains OpenTelemetry trace spans.
-->
<opentelemetry_log>
<!--
The default table creation code is insufficient, this <engine> spec
is a workaround. There is no 'event_time' for this log, but two times,
start and finish. It is sorted by finish time, to avoid inserting
data too far away in the past (probably we can sometimes insert a span
that is seconds earlier than the last span in the table, due to a race
between several spans inserted in parallel). This gives the spans a
global order that we can use to e.g. retry insertion into some external
system.
-->
<engine>
engine MergeTree
partition by toYYYYMM(finish_date)
order by (finish_date, finish_time, trace_id)
</engine>
<database>system</database>
<table>opentelemetry_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>

View File

@ -218,6 +218,7 @@ class IColumn;
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -65,11 +65,19 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
{
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_parent_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
if (opentelemetry_trace_id)
{
writeBinary(uint8_t(1), out);
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_parent_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
}
else
{
writeBinary(uint8_t(0), out);
}
}
}
@ -125,15 +133,24 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
client_version_patch = client_revision;
}
// TODO what does it even mean to read this structure over HTTP? I thought
// this was for native protocol? See interface == Interface::HTTP.
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_parent_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
uint8_t have_trace_id = 0;
readBinary(have_trace_id, in);
if (have_trace_id)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_parent_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
std::cerr << fmt::format("read {:x}, {}, {}\n", opentelemetry_trace_id, opentelemetry_span_id, opentelemetry_parent_span_id) << StackTrace().toString() << std::endl;
fmt::print(stderr, "read {:x}, {}, {} at\n{}\n",
opentelemetry_trace_id, opentelemetry_span_id,
opentelemetry_parent_span_id, StackTrace().toString());
}
}
}
@ -149,8 +166,8 @@ bool ClientInfo::setOpenTelemetryTraceparent(const std::string & traceparent,
std::string & error)
{
uint8_t version = -1;
__uint64_t trace_id_high = 0;
__uint64_t trace_id_low = 0;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
uint64_t trace_parent = 0;
uint8_t trace_flags = 0;
@ -205,11 +222,11 @@ bool ClientInfo::setOpenTelemetryTraceparent(const std::string & traceparent,
std::string ClientInfo::getOpenTelemetryTraceparentForChild() const
{
// This span is a parent for its children (so deep...), so we specify
// this span_id as a parent id.
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id,
opentelemetry_span_id,
// This cast is because fmt is being weird and complaining that
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(opentelemetry_trace_flags));
}

View File

@ -69,7 +69,7 @@ public:
// the incoming tracestate header, we just pass it downstream.
// https://www.w3.org/TR/trace-context/
String opentelemetry_tracestate;
UInt8 opentelemetry_trace_flags;
UInt8 opentelemetry_trace_flags = 0;
/// All below are parameters related to initial query.

View File

@ -1093,18 +1093,28 @@ void Context::setCurrentQueryId(const String & query_id)
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.opentelemetry_trace_id == 0)
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& client_info.opentelemetry_trace_id == 0)
{
// If trace_id is not initialized, it means that this is an initial query
// without any parent OpenTelemetry trace. Use the randomly generated
// default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
std::bernoulli_distribution should_start_trace{
settings.opentelemetry_start_trace_probability};
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
client_info.opentelemetry_trace_flags = 1;
}
}
else
{
// The incoming span id becomes our parent span id.
// The incoming request has an OpenTelemtry trace context. Its span id
// becomes our parent span id.
client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id;
client_info.opentelemetry_span_id = thread_local_rng();
}

View File

@ -13,16 +13,13 @@ namespace DB
Block OpenTelemetrySpanLogElement::createBlock()
{
return {
// event_date is the date part of event_time, used for indexing.
{std::make_shared<DataTypeDate>(), "event_date"},
// event_time is the span start time, named so to be compatible with
// the standard ClickHouse system log column names.
{std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUUID>(), "trace_id"},
{std::make_shared<DataTypeUInt64>(), "span_id"},
{std::make_shared<DataTypeUInt64>(), "parent_span_id"},
{std::make_shared<DataTypeString>(), "operation_name"},
{std::make_shared<DataTypeDateTime>(), "start_time"},
{std::make_shared<DataTypeDateTime>(), "finish_time"},
{std::make_shared<DataTypeDate>(), "finish_date"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.names"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
@ -34,13 +31,13 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(start_time));
columns[i++]->insert(start_time);
columns[i++]->insert(UInt128(Int128(trace_id)));
columns[i++]->insert(span_id);
columns[i++]->insert(parent_span_id);
columns[i++]->insert(operation_name);
columns[i++]->insert(start_time);
columns[i++]->insert(finish_time);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time));
columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values);
}

View File

@ -23,8 +23,9 @@ struct OpenTelemetrySpan
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
time_t start_time{};
time_t finish_time{};
time_t start_time;
time_t finish_time;
UInt64 duration_ns;
Array attribute_names;
Array attribute_values;
// I don't understand how Links work, namely, which direction should they

View File

@ -138,20 +138,26 @@ static void logQuery(const String & query, const Context & context, bool interna
}
else
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;
const auto & client_info = context.getClientInfo();
const auto & current_query_id = client_info.current_query_id;
const auto & initial_query_id = client_info.initial_query_id;
const auto & current_user = client_info.current_user;
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + context.getClientInfo().current_user : ""),
client_info.current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query));
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
context.getClientInfo().opentelemetry_trace_id, context.getClientInfo().opentelemetry_span_id,
context.getClientInfo().opentelemetry_parent_span_id);
if (client_info.opentelemetry_trace_id)
{
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
client_info.opentelemetry_trace_id,
client_info.opentelemetry_span_id,
client_info.opentelemetry_parent_span_id);
}
}
}
@ -222,7 +228,9 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
if (auto query_log = context.getQueryLog())
query_log->add(elem);
if (auto opentelemetry_log = context.getOpenTelemetryLog())
if (auto opentelemetry_log = context.getOpenTelemetryLog();
context.getClientInfo().opentelemetry_trace_id
&& opentelemetry_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
@ -231,20 +239,21 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
span.operation_name = "query";
span.start_time = current_time;
span.finish_time = current_time;
span.duration_ns = 0;
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("status");
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("ExceptionBeforeStart");
span.attribute_names.push_back("query");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("query_id");
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
{
span.attribute_names.push_back("tracestate");
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
}
@ -285,7 +294,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool has_query_tail,
ReadBuffer * istr)
{
time_t current_time = time(nullptr);
const time_t current_time = time(nullptr);
/// If we already executing query and it requires to execute internal query, than
/// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
@ -621,7 +630,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_log->add(elem);
}
if (auto opentelemetry_log = context.getOpenTelemetryLog())
if (auto opentelemetry_log = context.getOpenTelemetryLog();
context.getClientInfo().opentelemetry_trace_id
&& opentelemetry_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
@ -629,20 +640,21 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.operation_name = "query";
span.start_time = elem.query_start_time;
span.finish_time = time(nullptr); // current time
span.finish_time = elem.event_time;
span.duration_ns = elapsed_seconds * 1000000000;
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("status");
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("QueryFinish");
span.attribute_names.push_back("query");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("query_id");
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
{
span.attribute_names.push_back("tracestate");
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
}

View File

@ -850,12 +850,6 @@ void TCPHandler::receiveQuery()
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
client_info.read(*in, client_revision);
// It is convenient to generate default OpenTelemetry trace id and default
// query id together. ClientInfo might contain upstream trace id, so we
// decide whether to use the default ids after we have received the ClientInfo.
// We also set up the parent span id while we're at it.
query_context->setCurrentQueryId(state.query_id);
/// For better support of old clients, that does not send ClientInfo.
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
{
@ -884,8 +878,11 @@ void TCPHandler::receiveQuery()
/// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY;
auto settings_format =
(client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS)
? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY;
Settings passed_settings;
passed_settings.read(*in, settings_format);
auto settings_changes = passed_settings.changes();
@ -900,12 +897,23 @@ void TCPHandler::receiveQuery()
query_context->clampToSettingsConstraints(settings_changes);
}
query_context->applySettingsChanges(settings_changes);
const Settings & settings = query_context->getSettingsRef();
// Use the received query id, or generate a random default. It is convenient
// to also generate the default OpenTelemetry trace id at the same time, and
// and and set the trace parent.
// Why is this done here and not earlier:
// 1) ClientInfo might contain upstream trace id, so we decide whether to use
// the default ids after we have received the ClientInfo.
// 2) There is the opentelemetry_start_trace_probability setting that
// controls when we start a new trace. It can be changed via Native protocol,
// so we have to apply the changes first.
query_context->setCurrentQueryId(state.query_id);
/// Sync timeouts on client and server during current query to avoid dangling queries on server
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
/// because settings.send_timeout is client-side setting which has opposite meaning on the server side.
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
const Settings & settings = query_context->getSettingsRef();
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.receive_timeout, settings.send_timeout);
readVarUInt(stage, *in);

View File

@ -1,6 +1,10 @@
===http===
1
4
1
===native===
1
2
1
===sampled===
1 1

View File

@ -24,29 +24,60 @@ select count(distinct value)
where
trace_id = reinterpretAsUUID(reverse(unhex('$trace_id')))
and operation_name = 'query'
and name = 'tracestate'
and name = 'clickhouse.tracestate'
and length(value) > 0
;
"
}
# Generate some random trace id so that the prevous runs of the test do not interfere.
echo "===http==="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
# Check that the HTTP traceparent is read, and then passed through `remote` table function.
# We expect 4 queries, because there are two DESC TABLE queries for the shard.
# This is bug-ish, see https://github.com/ClickHouse/ClickHouse/issues/14228
${CLICKHOUSE_CURL} --header "traceparent: 00-$trace_id-0000000000000010-01" --header "tracestate: some custom state" "http://localhost:8123/" --get --data-urlencode "query=select 1 from remote('127.0.0.2', system, one)"
${CLICKHOUSE_CURL} \
--header "traceparent: 00-$trace_id-0000000000000010-01" \
--header "tracestate: some custom state" "http://localhost:8123/" \
--get \
--data-urlencode "query=select 1 from remote('127.0.0.2', system, one)"
check_log
# With another trace id, check that clickhouse-client accepts traceparent, and
# that it is passed through URL table function. We expect two query spans, one
# for the initial query, and one for the HTTP query.
echo "===native==="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
${CLICKHOUSE_CLIENT} --opentelemetry-traceparent "00-$trace_id-0000000000000020-02" --opentelemetry-tracestate "another custom state" --query "
select * from url('http://127.0.0.2:8123/?query=select%201', CSV, 'a int')
"
${CLICKHOUSE_CLIENT} \
--opentelemetry-traceparent "00-$trace_id-0000000000000020-02" \
--opentelemetry-tracestate "another custom state" \
--query "select * from url('http://127.0.0.2:8123/?query=select%201', CSV, 'a int')"
check_log
# Test sampled tracing. The traces should be started with the specified probability,
# only for initial queries.
echo "===sampled==="
query_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
for _ in {1..200}
do
${CLICKHOUSE_CLIENT} \
--opentelemetry_start_trace_probability=0.1 \
--query_id "$query_id" \
--query "select 1 from remote('127.0.0.2', system, one) format Null"
done
${CLICKHOUSE_CLIENT} -q "
with count(*) as c
-- expect 200 * 0.1 = 20 sampled events on average
select c > 10, c < 30
from system.opentelemetry_log
array join attribute.names as name, attribute.values as value
where name = 'clickhouse.query_id'
and value = '$query_id'
;
"