Merge pull request #50034 from Avogar/fiber-local-var-2

Fix assert in SpanHolder::finish() with fibers attempt 2
This commit is contained in:
Kruglov Pavel 2023-05-30 17:50:31 +02:00 committed by GitHub
commit 7966fa2da6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 180 additions and 104 deletions

View File

@ -121,7 +121,7 @@ ConnectionEstablisherAsync::ConnectionEstablisherAsync(
epoll.add(timeout_descriptor.getDescriptor());
}
void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, ResumeCallback)
void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, SuspendCallback)
{
connection_establisher_async.reset();
connection_establisher_async.connection_establisher.setAsyncCallback(async_callback);

View File

@ -91,7 +91,7 @@ private:
ConnectionEstablisherAsync & connection_establisher_async;
void run(AsyncCallback async_callback, ResumeCallback suspend_callback) override;
void run(AsyncCallback async_callback, SuspendCallback suspend_callback) override;
};
void cancelAfter() override;

View File

@ -57,7 +57,7 @@ bool PacketReceiver::checkTimeout()
return true;
}
void PacketReceiver::Task::run(AsyncCallback async_callback, ResumeCallback suspend_callback)
void PacketReceiver::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
while (true)
{

View File

@ -57,7 +57,7 @@ private:
PacketReceiver & receiver;
void run(AsyncCallback async_callback, ResumeCallback suspend_callback) override;
void run(AsyncCallback async_callback, SuspendCallback suspend_callback) override;
};
/// When epoll file descriptor is ready, check if it's an expired timeout.

View File

@ -3,18 +3,11 @@
namespace DB
{
thread_local FiberInfo current_fiber_info;
AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
createFiber();
}
FiberInfo AsyncTaskExecutor::getCurrentFiberInfo()
{
return current_fiber_info;
}
void AsyncTaskExecutor::resume()
{
if (routine_is_finished)
@ -38,10 +31,7 @@ void AsyncTaskExecutor::resume()
void AsyncTaskExecutor::resumeUnlocked()
{
auto parent_fiber_info = current_fiber_info;
current_fiber_info = FiberInfo{&fiber, &parent_fiber_info};
fiber = std::move(fiber).resume();
current_fiber_info = parent_fiber_info;
fiber.resume();
}
void AsyncTaskExecutor::cancel()
@ -69,30 +59,19 @@ struct AsyncTaskExecutor::Routine
struct AsyncCallback
{
AsyncTaskExecutor & executor;
Fiber & fiber;
SuspendCallback suspend_callback;
void operator()(int fd, Poco::Timespan timeout, AsyncEventTimeoutType type, const std::string & desc, uint32_t events)
{
executor.processAsyncEvent(fd, timeout, type, desc, events);
fiber = std::move(fiber).resume();
suspend_callback();
executor.clearAsyncEvent();
}
};
struct ResumeCallback
void operator()(SuspendCallback suspend_callback)
{
Fiber & fiber;
void operator()()
{
fiber = std::move(fiber).resume();
}
};
Fiber operator()(Fiber && sink)
{
auto async_callback = AsyncCallback{executor, sink};
auto suspend_callback = ResumeCallback{sink};
auto async_callback = AsyncCallback{executor, suspend_callback};
try
{
executor.task->run(async_callback, suspend_callback);
@ -110,18 +89,17 @@ struct AsyncTaskExecutor::Routine
}
executor.routine_is_finished = true;
return std::move(sink);
}
};
void AsyncTaskExecutor::createFiber()
{
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
fiber = Fiber(fiber_stack, Routine{*this});
}
void AsyncTaskExecutor::destroyFiber()
{
boost::context::fiber to_destroy = std::move(fiber);
Fiber to_destroy = std::move(fiber);
}
String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description)

View File

@ -22,7 +22,7 @@ enum class AsyncEventTimeoutType
};
using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
using ResumeCallback = std::function<void()>;
using SuspendCallback = std::function<void()>;
struct FiberInfo
{
@ -38,7 +38,7 @@ struct FiberInfo
struct AsyncTask
{
public:
virtual void run(AsyncCallback async_callback, ResumeCallback suspend_callback) = 0;
virtual void run(AsyncCallback async_callback, SuspendCallback suspend_callback) = 0;
virtual ~AsyncTask() = default;
};
@ -80,7 +80,6 @@ public:
};
#endif
static FiberInfo getCurrentFiberInfo();
protected:
/// Method that is called in resume() before actual fiber resuming.
/// If it returns false, resume() will return immediately without actual fiber resuming.
@ -124,48 +123,6 @@ private:
std::unique_ptr<AsyncTask> task;
};
/// Simple implementation for fiber local variable.
template <typename T>
struct FiberLocal
{
public:
FiberLocal()
{
/// Initialize main instance for this thread. Instances for fibers will inherit it,
/// (it's needed because main instance could be changed before creating fibers
/// and changes should be visible in fibers).
data[nullptr] = T();
}
T & operator*()
{
return get();
}
T * operator->()
{
return &get();
}
private:
T & get()
{
return getInstanceForFiber(AsyncTaskExecutor::getCurrentFiberInfo());
}
T & getInstanceForFiber(FiberInfo info)
{
auto it = data.find(info.fiber);
/// If it's the first request, we need to initialize instance for the fiber
/// using instance from parent fiber or main thread that created fiber.
if (it == data.end())
it = data.insert({info.fiber, getInstanceForFiber(*info.parent_fiber_info)}).first;
return it->second;
}
std::unordered_map<const Fiber *, T> data;
};
String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);
}

View File

@ -3,5 +3,147 @@
/// BOOST_USE_ASAN, BOOST_USE_TSAN and BOOST_USE_UCONTEXT should be correctly defined for sanitizers.
#include <base/defines.h>
#include <boost/context/fiber.hpp>
#include <map>
/// Class wrapper for boost::context::fiber.
/// It tracks current executing fiber for thread and
/// supports storing fiber-specific data
/// that will be destroyed on fiber destructor.
class Fiber
{
private:
using Impl = boost::context::fiber;
using FiberPtr = Fiber *;
template <typename T> friend class FiberLocal;
public:
template< typename StackAlloc, typename Fn>
Fiber(StackAlloc && salloc, Fn && fn) : impl(std::allocator_arg_t(), std::forward<StackAlloc>(salloc), RoutineImpl(std::forward<Fn>(fn)))
{
}
Fiber() = default;
Fiber(Fiber && other) = default;
Fiber & operator=(Fiber && other) = default;
Fiber(const Fiber &) = delete;
Fiber & operator =(const Fiber &) = delete;
explicit operator bool() const
{
return impl.operator bool();
}
void resume()
{
/// Update information about current executing fiber.
FiberPtr & current_fiber = getCurrentFiber();
FiberPtr parent_fiber = current_fiber;
current_fiber = this;
impl = std::move(impl).resume();
/// Restore parent fiber.
current_fiber = parent_fiber;
}
private:
template <typename Fn>
struct RoutineImpl
{
struct SuspendCallback
{
Impl & impl;
void operator()()
{
impl = std::move(impl).resume();
}
};
explicit RoutineImpl(Fn && fn_) : fn(std::move(fn_))
{
}
Impl operator()(Impl && sink)
{
SuspendCallback suspend_callback{sink};
fn(suspend_callback);
return std::move(sink);
}
Fn fn;
};
static FiberPtr & getCurrentFiber()
{
thread_local static FiberPtr current_fiber;
return current_fiber;
}
/// Special wrapper to store data in uniquer_ptr.
struct DataWrapper
{
virtual ~DataWrapper() = default;
};
using DataPtr = std::unique_ptr<DataWrapper>;
/// Get reference to fiber-specific data by key
/// (the pointer to the structure that uses this data).
DataPtr & getLocalData(void * key)
{
return local_data[key];
}
Impl && release()
{
return std::move(impl);
}
Impl impl;
std::map<void *, DataPtr> local_data;
};
/// Implementation for fiber local variable.
/// If we are in fiber, it returns fiber local data,
/// otherwise it returns it's single field.
/// Fiber local data is destroyed in Fiber destructor.
/// Implementation is similar to boost::fiber::fiber_specific_ptr
/// (we cannot use it because we don't use boost::fiber API.
template <typename T>
class FiberLocal
{
public:
T & operator*()
{
return get();
}
T * operator->()
{
return &get();
}
private:
struct DataWrapperImpl : public Fiber::DataWrapper
{
T impl;
};
T & get()
{
Fiber * current_fiber = Fiber::getCurrentFiber();
if (!current_fiber)
return main_instance;
Fiber::DataPtr & ptr = current_fiber->getLocalData(this);
/// Initialize instance on first request.
if (!ptr)
ptr = std::make_unique<DataWrapperImpl>();
return dynamic_cast<DataWrapperImpl *>(ptr.get())->impl;
}
T main_instance;
};
using Fiber = boost::context::fiber;

View File

@ -15,9 +15,8 @@ namespace DB
namespace OpenTelemetry
{
///// This code can be executed inside several fibers in one thread,
///// we should use fiber local tracing context.
thread_local FiberLocal<TracingContextOnThread> current_fiber_trace_context;
/// This code can be executed inside fibers, we should use fiber local tracing context.
thread_local FiberLocal<TracingContextOnThread> current_trace_context;
bool Span::addAttribute(std::string_view name, UInt64 value) noexcept
{
@ -109,7 +108,7 @@ bool Span::addAttributeImpl(std::string_view name, std::string_view value) noexc
SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
{
if (!current_fiber_trace_context->isTraceEnabled())
if (!current_trace_context->isTraceEnabled())
{
return;
}
@ -117,8 +116,8 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
/// Use try-catch to make sure the ctor is exception safe.
try
{
this->trace_id = current_fiber_trace_context->trace_id;
this->parent_span_id = current_fiber_trace_context->span_id;
this->trace_id = current_trace_context->trace_id;
this->parent_span_id = current_trace_context->span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->kind = _kind;
@ -137,7 +136,7 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
}
/// Set current span as parent of other spans created later on this thread.
current_fiber_trace_context->span_id = this->span_id;
current_trace_context->span_id = this->span_id;
}
void SpanHolder::finish() noexcept
@ -146,12 +145,12 @@ void SpanHolder::finish() noexcept
return;
// First of all, restore old value of current span.
assert(current_fiber_trace_context->span_id == span_id);
current_fiber_trace_context->span_id = parent_span_id;
assert(current_trace_context->span_id == span_id);
current_trace_context->span_id = parent_span_id;
try
{
auto log = current_fiber_trace_context->span_log.lock();
auto log = current_trace_context->span_log.lock();
/// The log might be disabled, check it before use
if (log)
@ -274,7 +273,7 @@ void TracingContext::serialize(WriteBuffer & buf) const
const TracingContextOnThread & CurrentContext()
{
return *current_fiber_trace_context;
return *current_trace_context;
}
void TracingContextOnThread::reset() noexcept
@ -296,7 +295,7 @@ TracingContextHolder::TracingContextHolder(
/// If any exception is raised during the construction, the tracing is not enabled on current thread.
try
{
if (current_fiber_trace_context->isTraceEnabled())
if (current_trace_context->isTraceEnabled())
{
///
/// This is not the normal case,
@ -309,15 +308,15 @@ TracingContextHolder::TracingContextHolder(
/// So this branch ensures this class can be instantiated multiple times on one same thread safely.
///
this->is_context_owner = false;
this->root_span.trace_id = current_fiber_trace_context->trace_id;
this->root_span.parent_span_id = current_fiber_trace_context->span_id;
this->root_span.trace_id = current_trace_context->trace_id;
this->root_span.parent_span_id = current_trace_context->span_id;
this->root_span.span_id = thread_local_rng();
this->root_span.operation_name = _operation_name;
this->root_span.start_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
/// Set the root span as parent of other spans created on current thread
current_fiber_trace_context->span_id = this->root_span.span_id;
current_trace_context->span_id = this->root_span.span_id;
return;
}
@ -361,10 +360,10 @@ TracingContextHolder::TracingContextHolder(
}
/// Set up trace context on current thread only when the root span is successfully initialized.
*current_fiber_trace_context = _parent_trace_context;
current_fiber_trace_context->span_id = this->root_span.span_id;
current_fiber_trace_context->trace_flags = TRACE_FLAG_SAMPLED;
current_fiber_trace_context->span_log = _span_log;
*current_trace_context = _parent_trace_context;
current_trace_context->span_id = this->root_span.span_id;
current_trace_context->trace_flags = TRACE_FLAG_SAMPLED;
current_trace_context->span_log = _span_log;
}
TracingContextHolder::~TracingContextHolder()
@ -376,7 +375,7 @@ TracingContextHolder::~TracingContextHolder()
try
{
auto shared_span_log = current_fiber_trace_context->span_log.lock();
auto shared_span_log = current_trace_context->span_log.lock();
if (shared_span_log)
{
try
@ -407,11 +406,11 @@ TracingContextHolder::~TracingContextHolder()
if (this->is_context_owner)
{
/// Clear the context on current thread
current_fiber_trace_context->reset();
current_trace_context->reset();
}
else
{
current_fiber_trace_context->span_id = this->root_span.parent_span_id;
current_trace_context->span_id = this->root_span.parent_span_id;
}
}

View File

@ -34,7 +34,7 @@ bool RemoteQueryExecutorReadContext::checkBeforeTaskResume()
}
void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, ResumeCallback suspend_callback)
void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
read_context.executor.sendQueryUnlocked(ClientInfo::QueryKind::SECONDARY_QUERY, async_callback);
read_context.is_query_sent = true;

View File

@ -58,7 +58,7 @@ private:
RemoteQueryExecutorReadContext & read_context;
void run(AsyncCallback async_callback, ResumeCallback suspend_callback) override;
void run(AsyncCallback async_callback, SuspendCallback suspend_callback) override;
};
std::atomic_bool is_in_progress = false;

View File

@ -6,7 +6,7 @@
{"query":"SELECT 1 AS `1` FROM `system`.`one`","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"query":"select 1 from remote('127.0.0.2', system, one) settings allow_experimental_analyzer = 1 format Null\n","query_status":"QueryFinish","tracestate":"some custom state","sorted_by_finish_time":1}
{"total spans":"3","unique spans":"3","unique non-zero parent spans":"3"}
{"initial query spans with proper parent":"1"}
{"initial query spans with proper parent":"2"}
{"unique non-empty tracestate values":"1"}
===native===
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}