Merge pull request #41251 from FrankChen021/opentelemetry_stress

Fix a bug that OpenTelemetry span is not closed in right order
This commit is contained in:
Robert Schulze 2022-09-15 09:43:57 +02:00 committed by GitHub
commit 6f1fc95d1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 194 additions and 109 deletions

View File

@ -130,16 +130,15 @@ void SpanHolder::finish() noexcept
try
{
auto log = current_thread_trace_context.span_log.lock();
if (!log)
/// The log might be disabled, check it before use
if (log)
{
// The log might be disabled.
return;
this->finish_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(*this));
}
this->finish_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(*this));
}
catch (...)
{

View File

@ -838,101 +838,117 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
QueryStatus * process_list_elem = context->getProcessListElement();
if (!process_list_elem)
return;
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
double elapsed_seconds = info.elapsed_seconds;
elem.type = QueryLogElementType::QUERY_FINISH;
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
if (process_list_elem)
{
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
if (elem.read_rows != 0)
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
double elapsed_seconds = info.elapsed_seconds;
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
elem.type = QueryLogElementType::QUERY_FINISH;
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
return reinterpret_cast<std::uintptr_t>(&proc);
};
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
for (const auto & processor : query_pipeline.getProcessors())
if (elem.read_rows != 0)
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
return reinterpret_cast<std::uintptr_t>(&proc);
};
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
}
@ -945,27 +961,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate);
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes);
query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage);
query_span->finish();
}
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
};
auto exception_callback = [elem,

View File

@ -0,0 +1,4 @@
{"query":"show processlist format Null\n "}
{"query":"show databases format Null\n "}
{"query":"insert into opentelemetry_test values","read_rows":"3","written_rows":"3"}
{"query":"select * from opentelemetry_test format Null\n ","read_rows":"3","written_rows":""}

View File

@ -0,0 +1,82 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This function takes 2 arguments:
# $1 - query id
# $2 - query
function execute_query()
{
${CLICKHOUSE_CLIENT} --opentelemetry_start_trace_probability=1 --query_id $1 -nq "
${2}
"
}
# For some queries, it's not possible to know how many bytes/rows are read when tests are executed on CI,
# so we only to check the db.statement only
function check_query_span_query_only()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT attribute['db.statement'] as query
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND operation_name = 'query'
AND attribute['clickhouse.query_id'] = '${1}'
Format JSONEachRow
;"
}
function check_query_span()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT attribute['db.statement'] as query,
attribute['clickhouse.read_rows'] as read_rows,
attribute['clickhouse.written_rows'] as written_rows
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND operation_name = 'query'
AND attribute['clickhouse.query_id'] = '${1}'
Format JSONEachRow
;"
}
#
# Set up
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test;
CREATE TABLE ${CLICKHOUSE_DATABASE}.opentelemetry_test (id UInt64) Engine=MergeTree Order By id;
"
# test 1, a query that has special path in the code
# Format Null is used to make sure no output is generated so that it won't pollute the reference file
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'show processlist format Null'
check_query_span_query_only "$query_id"
# test 2, a normal show command
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'show databases format Null'
check_query_span_query_only "$query_id"
# test 3, a normal insert query on local table
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'insert into opentelemetry_test values(1)(2)(3)'
check_query_span "$query_id"
# test 4, a normal select query
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'select * from opentelemetry_test format Null'
check_query_span $query_id
#
# Tear down
#
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test;
"