mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Inherit context from parent fiber
This commit is contained in:
parent
66971662de
commit
0cf6b9f145
2
contrib/boost
vendored
2
contrib/boost
vendored
@ -1 +1 @@
|
||||
Subproject commit d6c95434acbb1a02d0b9de52bf4f37cac6c00328
|
||||
Subproject commit 1035c8bfcc9a3c1cfa7f6e827db94dae1ce1a43a
|
@ -161,21 +161,6 @@ elseif (SANITIZE STREQUAL "thread")
|
||||
target_compile_definitions(_boost_context PUBLIC BOOST_USE_TSAN)
|
||||
endif()
|
||||
|
||||
# fiber
|
||||
|
||||
set (SRCS_FIBER
|
||||
"${LIBRARY_DIR}/libs/fiber/src/context.cpp"
|
||||
"${LIBRARY_DIR}/libs/fiber/src/fiber.cpp"
|
||||
"${LIBRARY_DIR}/libs/fiber/src/scheduler.cpp"
|
||||
"${LIBRARY_DIR}/libs/fiber/src/waker.cpp"
|
||||
"${LIBRARY_DIR}/libs/fiber/src/algo/round_robin.cpp"
|
||||
)
|
||||
|
||||
add_library (_boost_fiber ${SRCS_FIBER})
|
||||
add_library (boost::fiber ALIAS _boost_fiber)
|
||||
target_include_directories (_boost_fiber PRIVATE ${LIBRARY_DIR})
|
||||
target_link_libraries(_boost_fiber PRIVATE _boost_context)
|
||||
|
||||
# coroutine
|
||||
|
||||
set (SRCS_COROUTINE
|
||||
|
@ -547,9 +547,6 @@ endif ()
|
||||
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
|
||||
dbms_target_link_libraries(PUBLIC boost::context)
|
||||
|
||||
target_link_libraries(clickhouse_common_io PUBLIC boost::fiber)
|
||||
dbms_target_link_libraries(PUBLIC boost::fiber)
|
||||
|
||||
if (ENABLE_NLP)
|
||||
dbms_target_link_libraries (PUBLIC ch_contrib::stemmer)
|
||||
dbms_target_link_libraries (PUBLIC ch_contrib::wnb)
|
||||
|
@ -3,16 +3,16 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
thread_local const Fiber * current_fiber = nullptr;
|
||||
thread_local FiberInfo current_fiber_info;
|
||||
|
||||
AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
|
||||
{
|
||||
createFiber();
|
||||
}
|
||||
|
||||
const Fiber * AsyncTaskExecutor::getCurrentFiber()
|
||||
FiberInfo AsyncTaskExecutor::getCurrentFiberInfo()
|
||||
{
|
||||
return current_fiber;
|
||||
return current_fiber_info;
|
||||
}
|
||||
|
||||
void AsyncTaskExecutor::resume()
|
||||
@ -38,10 +38,10 @@ void AsyncTaskExecutor::resume()
|
||||
|
||||
void AsyncTaskExecutor::resumeUnlocked()
|
||||
{
|
||||
const auto * parent_fiber = current_fiber;
|
||||
current_fiber = &fiber;
|
||||
auto parent_fiber_info = current_fiber_info;
|
||||
current_fiber_info = FiberInfo{&fiber, &parent_fiber_info};
|
||||
fiber = std::move(fiber).resume();
|
||||
current_fiber = parent_fiber;
|
||||
current_fiber_info = parent_fiber_info;
|
||||
}
|
||||
|
||||
void AsyncTaskExecutor::cancel()
|
||||
|
@ -24,6 +24,11 @@ enum class AsyncEventTimeoutType
|
||||
using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
|
||||
using ResumeCallback = std::function<void()>;
|
||||
|
||||
struct FiberInfo
|
||||
{
|
||||
const Fiber * fiber = nullptr;
|
||||
const FiberInfo * parent_fiber_info = nullptr;
|
||||
};
|
||||
|
||||
/// Base class for a task that will be executed in a fiber.
|
||||
/// It has only one method - run, that takes 2 callbacks:
|
||||
@ -74,7 +79,7 @@ public:
|
||||
ERROR = 4,
|
||||
};
|
||||
#endif
|
||||
static const Fiber * getCurrentFiber();
|
||||
static FiberInfo getCurrentFiberInfo();
|
||||
|
||||
protected:
|
||||
/// Method that is called in resume() before actual fiber resuming.
|
||||
@ -119,30 +124,6 @@ private:
|
||||
std::unique_ptr<AsyncTask> task;
|
||||
};
|
||||
|
||||
/// Simple class for storing fiber local variables.
|
||||
template <typename T>
|
||||
class FiberLocalVariable
|
||||
{
|
||||
public:
|
||||
T & operator*()
|
||||
{
|
||||
return get();
|
||||
}
|
||||
|
||||
T * operator->()
|
||||
{
|
||||
return &get();
|
||||
}
|
||||
|
||||
private:
|
||||
T & get()
|
||||
{
|
||||
return data[AsyncTaskExecutor::getCurrentFiber()];
|
||||
}
|
||||
|
||||
std::unordered_map<const Fiber *, T> data;
|
||||
};
|
||||
|
||||
String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);
|
||||
|
||||
}
|
||||
|
@ -14,8 +14,48 @@ namespace DB
|
||||
namespace OpenTelemetry
|
||||
{
|
||||
|
||||
/// This code can be executed inside fiber, we should use fiber local context.
|
||||
thread_local FiberLocalVariable<TracingContextOnThread> current_fiber_trace_context;
|
||||
/// This code can be executed inside several fibers in one thread,
|
||||
/// we should use fiber local tracing context.
|
||||
struct FiberLocalTracingContextOnThread
|
||||
{
|
||||
public:
|
||||
FiberLocalTracingContextOnThread()
|
||||
{
|
||||
/// Initialize main context for this thread.
|
||||
/// Contexts for fibers will inherit this main context.
|
||||
data[nullptr] = TracingContextOnThread();
|
||||
}
|
||||
|
||||
TracingContextOnThread & operator*()
|
||||
{
|
||||
return get();
|
||||
}
|
||||
|
||||
TracingContextOnThread * operator->()
|
||||
{
|
||||
return &get();
|
||||
}
|
||||
|
||||
private:
|
||||
TracingContextOnThread & get()
|
||||
{
|
||||
/// Get context for current fiber.
|
||||
return getContextForFiber(AsyncTaskExecutor::getCurrentFiberInfo());
|
||||
}
|
||||
|
||||
TracingContextOnThread & getContextForFiber(FiberInfo info)
|
||||
{
|
||||
auto it = data.find(info.fiber);
|
||||
/// If it's the first request, we need to initialize context for the fiber using context from parent fiber.
|
||||
if (it == data.end())
|
||||
it = data.insert({info.fiber, getContextForFiber(*info.parent_fiber_info)}).first;
|
||||
return it->second;
|
||||
}
|
||||
|
||||
std::unordered_map<const Fiber *, TracingContextOnThread> data;
|
||||
};
|
||||
|
||||
thread_local FiberLocalTracingContextOnThread current_fiber_trace_context;
|
||||
|
||||
bool Span::addAttribute(std::string_view name, UInt64 value) noexcept
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user