Fix assert in SpanHolder::finish() with fibers

This commit is contained in:
avogar 2023-05-08 16:33:39 +00:00
parent f96e0e44c3
commit b320527158
4 changed files with 62 additions and 21 deletions

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 8fe7b3326ef482ee6ecdf5a4f698f2b8c2780f98
Subproject commit d6c95434acbb1a02d0b9de52bf4f37cac6c00328

View File

@ -151,6 +151,7 @@ add_library (_boost_context ${SRCS_CONTEXT})
add_library (boost::context ALIAS _boost_context)
target_include_directories (_boost_context PRIVATE ${LIBRARY_DIR})
if (SANITIZE OR BOOST_USE_UCONTEXT)
target_compile_definitions(_boost_context PUBLIC BOOST_USE_UCONTEXT)
endif()
@ -161,6 +162,28 @@ elseif (SANITIZE STREQUAL "thread")
target_compile_definitions(_boost_context PUBLIC BOOST_USE_TSAN)
endif()
# fiber
set (SRCS_FIBER
"${LIBRARY_DIR}/libs/fiber/src/context.cpp"
"${LIBRARY_DIR}/libs/fiber/src/fiber.cpp"
"${LIBRARY_DIR}/libs/fiber/src/barrier.cpp"
"${LIBRARY_DIR}/libs/fiber/src/condition_variable.cpp"
"${LIBRARY_DIR}/libs/fiber/src/future.cpp"
"${LIBRARY_DIR}/libs/fiber/src/mutex.cpp"
"${LIBRARY_DIR}/libs/fiber/src/properties.cpp"
"${LIBRARY_DIR}/libs/fiber/src/recursive_mutex.cpp"
"${LIBRARY_DIR}/libs/fiber/src/recursive_timed_mutex.cpp"
"${LIBRARY_DIR}/libs/fiber/src/scheduler.cpp"
"${LIBRARY_DIR}/libs/fiber/src/timed_mutex.cpp"
"${LIBRARY_DIR}/libs/fiber/src/waker.cpp"
"${LIBRARY_DIR}/libs/fiber/src/algo/round_robin.cpp"
)
add_library (_boost_fiber ${SRCS_FIBER})
add_library (boost::fiber ALIAS _boost_fiber)
target_include_directories (_boost_fiber PRIVATE ${LIBRARY_DIR})
# coroutine
set (SRCS_COROUTINE

View File

@ -547,6 +547,9 @@ endif ()
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
dbms_target_link_libraries(PUBLIC boost::context)
target_link_libraries(clickhouse_common_io PUBLIC boost::fiber)
dbms_target_link_libraries(PUBLIC boost::fiber)
if (ENABLE_NLP)
dbms_target_link_libraries (PUBLIC ch_contrib::stemmer)
dbms_target_link_libraries (PUBLIC ch_contrib::wnb)

View File

@ -6,13 +6,26 @@
#include <base/hex.h>
#include <Core/Settings.h>
#include <IO/Operators.h>
#include <boost/fiber/fss.hpp>
namespace DB
{
namespace OpenTelemetry
{
thread_local TracingContextOnThread current_thread_trace_context;
static TracingContextOnThread & getCurrentThreadTraceContext()
{
static boost::fibers::fiber_specific_ptr<TracingContextOnThread> current_thread_trace_context;
auto * ptr = current_thread_trace_context.get();
if (unlikely(!ptr))
{
ptr = new TracingContextOnThread();
current_thread_trace_context.reset(ptr);
}
return *ptr;
}
bool Span::addAttribute(std::string_view name, UInt64 value) noexcept
{
@ -104,7 +117,7 @@ bool Span::addAttributeImpl(std::string_view name, std::string_view value) noexc
SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
{
if (!current_thread_trace_context.isTraceEnabled())
if (!getCurrentThreadTraceContext().isTraceEnabled())
{
return;
}
@ -112,8 +125,8 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
/// Use try-catch to make sure the ctor is exception safe.
try
{
this->trace_id = current_thread_trace_context.trace_id;
this->parent_span_id = current_thread_trace_context.span_id;
this->trace_id = getCurrentThreadTraceContext().trace_id;
this->parent_span_id = getCurrentThreadTraceContext().span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->kind = _kind;
@ -132,7 +145,7 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
}
/// Set current span as parent of other spans created later on this thread.
current_thread_trace_context.span_id = this->span_id;
getCurrentThreadTraceContext().span_id = this->span_id;
}
void SpanHolder::finish() noexcept
@ -141,12 +154,12 @@ void SpanHolder::finish() noexcept
return;
// First of all, restore old value of current span.
assert(current_thread_trace_context.span_id == span_id);
current_thread_trace_context.span_id = parent_span_id;
assert(getCurrentThreadTraceContext().span_id == span_id);
getCurrentThreadTraceContext().span_id = parent_span_id;
try
{
auto log = current_thread_trace_context.span_log.lock();
auto log = getCurrentThreadTraceContext().span_log.lock();
/// The log might be disabled, check it before use
if (log)
@ -269,7 +282,7 @@ void TracingContext::serialize(WriteBuffer & buf) const
const TracingContextOnThread & CurrentContext()
{
return current_thread_trace_context;
return getCurrentThreadTraceContext();
}
void TracingContextOnThread::reset() noexcept
@ -291,7 +304,7 @@ TracingContextHolder::TracingContextHolder(
/// If any exception is raised during the construction, the tracing is not enabled on current thread.
try
{
if (current_thread_trace_context.isTraceEnabled())
if (getCurrentThreadTraceContext().isTraceEnabled())
{
///
/// This is not the normal case,
@ -304,15 +317,15 @@ TracingContextHolder::TracingContextHolder(
/// So this branch ensures this class can be instantiated multiple times on one same thread safely.
///
this->is_context_owner = false;
this->root_span.trace_id = current_thread_trace_context.trace_id;
this->root_span.parent_span_id = current_thread_trace_context.span_id;
this->root_span.trace_id = getCurrentThreadTraceContext().trace_id;
this->root_span.parent_span_id = getCurrentThreadTraceContext().span_id;
this->root_span.span_id = thread_local_rng();
this->root_span.operation_name = _operation_name;
this->root_span.start_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
/// Set the root span as parent of other spans created on current thread
current_thread_trace_context.span_id = this->root_span.span_id;
getCurrentThreadTraceContext().span_id = this->root_span.span_id;
return;
}
@ -356,10 +369,10 @@ TracingContextHolder::TracingContextHolder(
}
/// Set up trace context on current thread only when the root span is successfully initialized.
current_thread_trace_context = _parent_trace_context;
current_thread_trace_context.span_id = this->root_span.span_id;
current_thread_trace_context.trace_flags = TRACE_FLAG_SAMPLED;
current_thread_trace_context.span_log = _span_log;
getCurrentThreadTraceContext() = _parent_trace_context;
getCurrentThreadTraceContext().span_id = this->root_span.span_id;
getCurrentThreadTraceContext().trace_flags = TRACE_FLAG_SAMPLED;
getCurrentThreadTraceContext().span_log = _span_log;
}
TracingContextHolder::~TracingContextHolder()
@ -371,7 +384,7 @@ TracingContextHolder::~TracingContextHolder()
try
{
auto shared_span_log = current_thread_trace_context.span_log.lock();
auto shared_span_log = getCurrentThreadTraceContext().span_log.lock();
if (shared_span_log)
{
try
@ -402,12 +415,14 @@ TracingContextHolder::~TracingContextHolder()
if (this->is_context_owner)
{
/// Clear the context on current thread
current_thread_trace_context.reset();
getCurrentThreadTraceContext().reset();
}
else
{
current_thread_trace_context.span_id = this->root_span.parent_span_id;
getCurrentThreadTraceContext().span_id = this->root_span.parent_span_id;
}
}
}