Make sure span is finished in the onFinish callback

Signed-off-by: Frank Chen <frank.chen021@outlook.com>
This commit is contained in:
Frank Chen 2022-09-13 18:28:21 +08:00
parent 7303ae1796
commit d3265150c0

View File

@ -838,101 +838,117 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{ {
QueryStatus * process_list_elem = context->getProcessListElement(); QueryStatus * process_list_elem = context->getProcessListElement();
if (!process_list_elem) if (process_list_elem)
return;
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
double elapsed_seconds = info.elapsed_seconds;
elem.type = QueryLogElementType::QUERY_FINISH;
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
{ {
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes); /// Update performance counters before logging to query_log
} CurrentThread::finalizePerformanceCounters();
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto progress_callback = context->getProgressCallback(); QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
if (progress_callback)
{
Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
if (elem.read_rows != 0) double elapsed_seconds = info.elapsed_seconds;
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms) elem.type = QueryLogElementType::QUERY_FINISH;
{
if (auto query_log = context->getQueryLog()) // construct event_time and event_time_microseconds using the same time point
query_log->add(elem); // so that the two times will always be equal up to a precision of a second.
} const auto finish_time = std::chrono::system_clock::now();
if (log_processors_profiles) elem.event_time = time_in_seconds(finish_time);
{ elem.event_time_microseconds = time_in_microseconds(finish_time);
if (auto processors_profile_log = context->getProcessorsProfileLog()) status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
{ {
ProcessorProfileLogElement processor_elem; query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
processor_elem.event_time = time_in_seconds(finish_time); }
processor_elem.event_time_microseconds = time_in_microseconds(finish_time); else /// will be used only for ordinary INSERT queries
processor_elem.query_id = elem.client_info.current_query_id; {
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto get_proc_id = [](const IProcessor & proc) -> UInt64 auto progress_callback = context->getProgressCallback();
{ if (progress_callback)
return reinterpret_cast<std::uintptr_t>(&proc); {
}; Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
for (const auto & processor : query_pipeline.getProcessors()) if (elem.read_rows != 0)
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{ {
std::vector<UInt64> parents; ProcessorProfileLogElement processor_elem;
for (const auto & port : processor->getOutputs()) processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{ {
if (!port.isConnected()) return reinterpret_cast<std::uintptr_t>(&proc);
continue; };
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next)); for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
} }
}
}
processor_elem.id = get_proc_id(*processor); if (implicit_txn_control)
processor_elem.parent_ids = std::move(parents); {
try
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep()); {
processor_elem.plan_group = processor->getQueryPlanStepGroup(); implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
processor_elem.processor_name = processor->getName(); }
catch (const Exception &)
processor_elem.elapsed_us = processor->getElapsedUs(); {
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs(); /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs(); /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
auto stats = processor->getProcessorDataStats(); throw;
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
} }
} }
} }
@ -945,27 +961,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate); query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate);
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows); query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes); query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows); query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes); query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes);
query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage); query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage);
query_span->finish(); query_span->finish();
} }
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
}; };
auto exception_callback = [elem, auto exception_callback = [elem,