diff --git a/src/Common/OpenTelemetryTraceContext.cpp b/src/Common/OpenTelemetryTraceContext.cpp index 7a1f94926d5..d5c2188ad01 100644 --- a/src/Common/OpenTelemetryTraceContext.cpp +++ b/src/Common/OpenTelemetryTraceContext.cpp @@ -130,16 +130,15 @@ void SpanHolder::finish() noexcept try { auto log = current_thread_trace_context.span_log.lock(); - if (!log) + + /// The log might be disabled, check it before use + if (log) { - // The log might be disabled. - return; + this->finish_time_us + = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + log->add(OpenTelemetrySpanLogElement(*this)); } - - this->finish_time_us - = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - - log->add(OpenTelemetrySpanLogElement(*this)); } catch (...) { diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index c501c1722ba..b6434955418 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -838,101 +838,117 @@ static std::tuple executeQueryImpl( { QueryStatus * process_list_elem = context->getProcessListElement(); - if (!process_list_elem) - return; - - /// Update performance counters before logging to query_log - CurrentThread::finalizePerformanceCounters(); - - QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events); - - double elapsed_seconds = info.elapsed_seconds; - - elem.type = QueryLogElementType::QUERY_FINISH; - - // construct event_time and event_time_microseconds using the same time point - // so that the two times will always be equal up to a precision of a second. - const auto finish_time = std::chrono::system_clock::now(); - elem.event_time = time_in_seconds(finish_time); - elem.event_time_microseconds = time_in_microseconds(finish_time); - status_info_to_query_log(elem, info, ast, context); - - if (pulling_pipeline) + if (process_list_elem) { - query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes); - } - else /// will be used only for ordinary INSERT queries - { - auto progress_out = process_list_elem->getProgressOut(); - elem.result_rows = progress_out.written_rows; - elem.result_bytes = progress_out.written_bytes; - } + /// Update performance counters before logging to query_log + CurrentThread::finalizePerformanceCounters(); - auto progress_callback = context->getProgressCallback(); - if (progress_callback) - { - Progress p(WriteProgress{info.written_rows, info.written_bytes}); - p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}}); - progress_callback(p); - } + QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events); - if (elem.read_rows != 0) - { - LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.", - elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds, - static_cast(elem.read_rows / elapsed_seconds), - ReadableSize(elem.read_bytes / elapsed_seconds)); - } + double elapsed_seconds = info.elapsed_seconds; - if (log_queries && elem.type >= log_queries_min_type && static_cast(elem.query_duration_ms) >= log_queries_min_query_duration_ms) - { - if (auto query_log = context->getQueryLog()) - query_log->add(elem); - } - if (log_processors_profiles) - { - if (auto processors_profile_log = context->getProcessorsProfileLog()) + elem.type = QueryLogElementType::QUERY_FINISH; + + // construct event_time and event_time_microseconds using the same time point + // so that the two times will always be equal up to a precision of a second. + const auto finish_time = std::chrono::system_clock::now(); + elem.event_time = time_in_seconds(finish_time); + elem.event_time_microseconds = time_in_microseconds(finish_time); + status_info_to_query_log(elem, info, ast, context); + + if (pulling_pipeline) { - ProcessorProfileLogElement processor_elem; - processor_elem.event_time = time_in_seconds(finish_time); - processor_elem.event_time_microseconds = time_in_microseconds(finish_time); - processor_elem.query_id = elem.client_info.current_query_id; + query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes); + } + else /// will be used only for ordinary INSERT queries + { + auto progress_out = process_list_elem->getProgressOut(); + elem.result_rows = progress_out.written_rows; + elem.result_bytes = progress_out.written_bytes; + } - auto get_proc_id = [](const IProcessor & proc) -> UInt64 - { - return reinterpret_cast(&proc); - }; + auto progress_callback = context->getProgressCallback(); + if (progress_callback) + { + Progress p(WriteProgress{info.written_rows, info.written_bytes}); + p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}}); + progress_callback(p); + } - for (const auto & processor : query_pipeline.getProcessors()) + if (elem.read_rows != 0) + { + LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.", + elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds, + static_cast(elem.read_rows / elapsed_seconds), + ReadableSize(elem.read_bytes / elapsed_seconds)); + } + + if (log_queries && elem.type >= log_queries_min_type && static_cast(elem.query_duration_ms) >= log_queries_min_query_duration_ms) + { + if (auto query_log = context->getQueryLog()) + query_log->add(elem); + } + if (log_processors_profiles) + { + if (auto processors_profile_log = context->getProcessorsProfileLog()) { - std::vector parents; - for (const auto & port : processor->getOutputs()) + ProcessorProfileLogElement processor_elem; + processor_elem.event_time = time_in_seconds(finish_time); + processor_elem.event_time_microseconds = time_in_microseconds(finish_time); + processor_elem.query_id = elem.client_info.current_query_id; + + auto get_proc_id = [](const IProcessor & proc) -> UInt64 { - if (!port.isConnected()) - continue; - const IProcessor & next = port.getInputPort().getProcessor(); - parents.push_back(get_proc_id(next)); + return reinterpret_cast(&proc); + }; + + for (const auto & processor : query_pipeline.getProcessors()) + { + std::vector parents; + for (const auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + const IProcessor & next = port.getInputPort().getProcessor(); + parents.push_back(get_proc_id(next)); + } + + processor_elem.id = get_proc_id(*processor); + processor_elem.parent_ids = std::move(parents); + + processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); + processor_elem.plan_group = processor->getQueryPlanStepGroup(); + + processor_elem.processor_name = processor->getName(); + + processor_elem.elapsed_us = processor->getElapsedUs(); + processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs(); + processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs(); + + auto stats = processor->getProcessorDataStats(); + processor_elem.input_rows = stats.input_rows; + processor_elem.input_bytes = stats.input_bytes; + processor_elem.output_rows = stats.output_rows; + processor_elem.output_bytes = stats.output_bytes; + + processors_profile_log->add(processor_elem); } + } + } - processor_elem.id = get_proc_id(*processor); - processor_elem.parent_ids = std::move(parents); - - processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); - processor_elem.plan_group = processor->getQueryPlanStepGroup(); - - processor_elem.processor_name = processor->getName(); - - processor_elem.elapsed_us = processor->getElapsedUs(); - processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs(); - processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs(); - - auto stats = processor->getProcessorDataStats(); - processor_elem.input_rows = stats.input_rows; - processor_elem.input_bytes = stats.input_bytes; - processor_elem.output_rows = stats.output_rows; - processor_elem.output_bytes = stats.output_bytes; - - processors_profile_log->add(processor_elem); + if (implicit_txn_control) + { + try + { + implicit_txn_control->executeCommit(context->getSessionContext()); + implicit_txn_control.reset(); + } + catch (const Exception &) + { + /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception + /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN + implicit_txn_control.reset(); + throw; } } } @@ -945,27 +961,11 @@ static std::tuple executeQueryImpl( query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate); query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows); query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes); - query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows); + query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows); query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes); query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage); query_span->finish(); } - - if (implicit_txn_control) - { - try - { - implicit_txn_control->executeCommit(context->getSessionContext()); - implicit_txn_control.reset(); - } - catch (const Exception &) - { - /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception - /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN - implicit_txn_control.reset(); - throw; - } - } }; auto exception_callback = [elem, diff --git a/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.reference b/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.reference new file mode 100644 index 00000000000..d167d905636 --- /dev/null +++ b/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.reference @@ -0,0 +1,4 @@ +{"query":"show processlist format Null\n "} +{"query":"show databases format Null\n "} +{"query":"insert into opentelemetry_test values","read_rows":"3","written_rows":"3"} +{"query":"select * from opentelemetry_test format Null\n ","read_rows":"3","written_rows":""} diff --git a/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.sh b/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.sh new file mode 100755 index 00000000000..98b571c5968 --- /dev/null +++ b/tests/queries/0_stateless/02421_simple_queries_for_opentelemetry.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# This function takes 2 arguments: +# $1 - query id +# $2 - query +function execute_query() +{ + ${CLICKHOUSE_CLIENT} --opentelemetry_start_trace_probability=1 --query_id $1 -nq " + ${2} + " +} + +# For some queries, it's not possible to know how many bytes/rows are read when tests are executed on CI, +# so we only to check the db.statement only +function check_query_span_query_only() +{ +${CLICKHOUSE_CLIENT} -nq " + SYSTEM FLUSH LOGS; + SELECT attribute['db.statement'] as query + FROM system.opentelemetry_span_log + WHERE finish_date >= yesterday() + AND operation_name = 'query' + AND attribute['clickhouse.query_id'] = '${1}' + Format JSONEachRow + ;" +} + +function check_query_span() +{ +${CLICKHOUSE_CLIENT} -nq " + SYSTEM FLUSH LOGS; + SELECT attribute['db.statement'] as query, + attribute['clickhouse.read_rows'] as read_rows, + attribute['clickhouse.written_rows'] as written_rows + FROM system.opentelemetry_span_log + WHERE finish_date >= yesterday() + AND operation_name = 'query' + AND attribute['clickhouse.query_id'] = '${1}' + Format JSONEachRow + ;" +} + +# +# Set up +# +${CLICKHOUSE_CLIENT} -nq " +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test; +CREATE TABLE ${CLICKHOUSE_DATABASE}.opentelemetry_test (id UInt64) Engine=MergeTree Order By id; +" + +# test 1, a query that has special path in the code +# Format Null is used to make sure no output is generated so that it won't pollute the reference file +query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()"); +execute_query $query_id 'show processlist format Null' +check_query_span_query_only "$query_id" + +# test 2, a normal show command +query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()"); +execute_query $query_id 'show databases format Null' +check_query_span_query_only "$query_id" + +# test 3, a normal insert query on local table +query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()"); +execute_query $query_id 'insert into opentelemetry_test values(1)(2)(3)' +check_query_span "$query_id" + +# test 4, a normal select query +query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()"); +execute_query $query_id 'select * from opentelemetry_test format Null' +check_query_span $query_id + + +# +# Tear down +# +${CLICKHOUSE_CLIENT} -q " +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test; +" \ No newline at end of file