From d3265150c03f4769d316b90c588b6764b4873430 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 13 Sep 2022 18:28:21 +0800 Subject: [PATCH] Make sure span is finished in the onFinish callback Signed-off-by: Frank Chen --- src/Interpreters/executeQuery.cpp | 202 +++++++++++++++--------------- 1 file changed, 101 insertions(+), 101 deletions(-) diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index c501c1722ba..b6434955418 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -838,101 +838,117 @@ static std::tuple executeQueryImpl( { QueryStatus * process_list_elem = context->getProcessListElement(); - if (!process_list_elem) - return; - - /// Update performance counters before logging to query_log - CurrentThread::finalizePerformanceCounters(); - - QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events); - - double elapsed_seconds = info.elapsed_seconds; - - elem.type = QueryLogElementType::QUERY_FINISH; - - // construct event_time and event_time_microseconds using the same time point - // so that the two times will always be equal up to a precision of a second. - const auto finish_time = std::chrono::system_clock::now(); - elem.event_time = time_in_seconds(finish_time); - elem.event_time_microseconds = time_in_microseconds(finish_time); - status_info_to_query_log(elem, info, ast, context); - - if (pulling_pipeline) + if (process_list_elem) { - query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes); - } - else /// will be used only for ordinary INSERT queries - { - auto progress_out = process_list_elem->getProgressOut(); - elem.result_rows = progress_out.written_rows; - elem.result_bytes = progress_out.written_bytes; - } + /// Update performance counters before logging to query_log + CurrentThread::finalizePerformanceCounters(); - auto progress_callback = context->getProgressCallback(); - if (progress_callback) - { - Progress p(WriteProgress{info.written_rows, info.written_bytes}); - p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}}); - progress_callback(p); - } + QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events); - if (elem.read_rows != 0) - { - LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.", - elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds, - static_cast(elem.read_rows / elapsed_seconds), - ReadableSize(elem.read_bytes / elapsed_seconds)); - } + double elapsed_seconds = info.elapsed_seconds; - if (log_queries && elem.type >= log_queries_min_type && static_cast(elem.query_duration_ms) >= log_queries_min_query_duration_ms) - { - if (auto query_log = context->getQueryLog()) - query_log->add(elem); - } - if (log_processors_profiles) - { - if (auto processors_profile_log = context->getProcessorsProfileLog()) + elem.type = QueryLogElementType::QUERY_FINISH; + + // construct event_time and event_time_microseconds using the same time point + // so that the two times will always be equal up to a precision of a second. + const auto finish_time = std::chrono::system_clock::now(); + elem.event_time = time_in_seconds(finish_time); + elem.event_time_microseconds = time_in_microseconds(finish_time); + status_info_to_query_log(elem, info, ast, context); + + if (pulling_pipeline) { - ProcessorProfileLogElement processor_elem; - processor_elem.event_time = time_in_seconds(finish_time); - processor_elem.event_time_microseconds = time_in_microseconds(finish_time); - processor_elem.query_id = elem.client_info.current_query_id; + query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes); + } + else /// will be used only for ordinary INSERT queries + { + auto progress_out = process_list_elem->getProgressOut(); + elem.result_rows = progress_out.written_rows; + elem.result_bytes = progress_out.written_bytes; + } - auto get_proc_id = [](const IProcessor & proc) -> UInt64 - { - return reinterpret_cast(&proc); - }; + auto progress_callback = context->getProgressCallback(); + if (progress_callback) + { + Progress p(WriteProgress{info.written_rows, info.written_bytes}); + p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}}); + progress_callback(p); + } - for (const auto & processor : query_pipeline.getProcessors()) + if (elem.read_rows != 0) + { + LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.", + elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds, + static_cast(elem.read_rows / elapsed_seconds), + ReadableSize(elem.read_bytes / elapsed_seconds)); + } + + if (log_queries && elem.type >= log_queries_min_type && static_cast(elem.query_duration_ms) >= log_queries_min_query_duration_ms) + { + if (auto query_log = context->getQueryLog()) + query_log->add(elem); + } + if (log_processors_profiles) + { + if (auto processors_profile_log = context->getProcessorsProfileLog()) { - std::vector parents; - for (const auto & port : processor->getOutputs()) + ProcessorProfileLogElement processor_elem; + processor_elem.event_time = time_in_seconds(finish_time); + processor_elem.event_time_microseconds = time_in_microseconds(finish_time); + processor_elem.query_id = elem.client_info.current_query_id; + + auto get_proc_id = [](const IProcessor & proc) -> UInt64 { - if (!port.isConnected()) - continue; - const IProcessor & next = port.getInputPort().getProcessor(); - parents.push_back(get_proc_id(next)); + return reinterpret_cast(&proc); + }; + + for (const auto & processor : query_pipeline.getProcessors()) + { + std::vector parents; + for (const auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + const IProcessor & next = port.getInputPort().getProcessor(); + parents.push_back(get_proc_id(next)); + } + + processor_elem.id = get_proc_id(*processor); + processor_elem.parent_ids = std::move(parents); + + processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); + processor_elem.plan_group = processor->getQueryPlanStepGroup(); + + processor_elem.processor_name = processor->getName(); + + processor_elem.elapsed_us = processor->getElapsedUs(); + processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs(); + processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs(); + + auto stats = processor->getProcessorDataStats(); + processor_elem.input_rows = stats.input_rows; + processor_elem.input_bytes = stats.input_bytes; + processor_elem.output_rows = stats.output_rows; + processor_elem.output_bytes = stats.output_bytes; + + processors_profile_log->add(processor_elem); } + } + } - processor_elem.id = get_proc_id(*processor); - processor_elem.parent_ids = std::move(parents); - - processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); - processor_elem.plan_group = processor->getQueryPlanStepGroup(); - - processor_elem.processor_name = processor->getName(); - - processor_elem.elapsed_us = processor->getElapsedUs(); - processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs(); - processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs(); - - auto stats = processor->getProcessorDataStats(); - processor_elem.input_rows = stats.input_rows; - processor_elem.input_bytes = stats.input_bytes; - processor_elem.output_rows = stats.output_rows; - processor_elem.output_bytes = stats.output_bytes; - - processors_profile_log->add(processor_elem); + if (implicit_txn_control) + { + try + { + implicit_txn_control->executeCommit(context->getSessionContext()); + implicit_txn_control.reset(); + } + catch (const Exception &) + { + /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception + /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN + implicit_txn_control.reset(); + throw; } } } @@ -945,27 +961,11 @@ static std::tuple executeQueryImpl( query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate); query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows); query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes); - query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows); + query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows); query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes); query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage); query_span->finish(); } - - if (implicit_txn_control) - { - try - { - implicit_txn_control->executeCommit(context->getSessionContext()); - implicit_txn_control.reset(); - } - catch (const Exception &) - { - /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception - /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN - implicit_txn_control.reset(); - throw; - } - } }; auto exception_callback = [elem,