Opentracing minimal changes for processors (#37837)

This commit is contained in:
Ilya Yatsishin 2022-06-09 14:43:50 +02:00 committed by GitHub
parent 203de0c352
commit d6427f56f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 19 additions and 11 deletions

View File

@ -118,11 +118,14 @@ OpenTelemetrySpanHolder::~OpenTelemetrySpanHolder()
auto * thread_group = CurrentThread::getGroup().get();
// Not sure whether and when this can be null.
if (!thread_group)
{
return;
ContextPtr context;
{
std::lock_guard lock(thread_group->mutex);
context = thread_group->query_context.lock();
}
auto context = thread_group->query_context.lock();
if (!context)
{
// Both global and query contexts can be null when executing a
@ -266,4 +269,3 @@ std::string OpenTelemetryTraceContext::composeTraceparentHeader() const
}

View File

@ -33,7 +33,7 @@ struct CompletedPipelineExecutor::Data
static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
setThreadName("QueryPipelineEx");
setThreadName("QueryCompPipeEx");
try
{

View File

@ -1,6 +1,7 @@
#include <Processors/Executors/ExecutionThreadContext.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Common/Stopwatch.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
namespace DB
{
@ -70,6 +71,7 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
bool ExecutionThreadContext::executeTask()
{
OpenTelemetrySpanHolder span("ExecutionThreadContext::executeTask() " + node->processor->getName());
std::optional<Stopwatch> execution_time_watch;
#ifndef NDEBUG
@ -90,12 +92,19 @@ bool ExecutionThreadContext::executeTask()
}
if (profile_processors)
node->processor->elapsed_us += execution_time_watch->elapsedMicroseconds();
{
UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
node->processor->elapsed_us += elapsed_microseconds;
span.addAttribute("execution_time_ms", elapsed_microseconds);
}
#ifndef NDEBUG
execution_time_ns += execution_time_watch->elapsed();
span.addAttribute("execution_time_ns", execution_time_watch->elapsed());
#endif
span.addAttribute("thread_number", thread_number);
span.addAttribute("processor.description", node->processor->getDescription());
return node->exception == nullptr;
}

View File

@ -10,7 +10,6 @@
#include <Processors/ISource.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Context.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Common/scope_guard_safe.h>
#ifndef NDEBUG
@ -275,8 +274,6 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
void PipelineExecutor::executeImpl(size_t num_threads)
{
OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");
initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>;

View File

@ -69,7 +69,7 @@ const Block & PullingAsyncPipelineExecutor::getHeader() const
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
setThreadName("QueryPipelineEx");
setThreadName("QueryPullPipeEx");
try
{

View File

@ -98,7 +98,7 @@ struct PushingAsyncPipelineExecutor::Data
static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
setThreadName("QueryPipelineEx");
setThreadName("QueryPushPipeEx");
try
{