diff --git a/contrib/boost b/contrib/boost index d6c95434acb..1035c8bfcc9 160000 --- a/contrib/boost +++ b/contrib/boost @@ -1 +1 @@ -Subproject commit d6c95434acbb1a02d0b9de52bf4f37cac6c00328 +Subproject commit 1035c8bfcc9a3c1cfa7f6e827db94dae1ce1a43a diff --git a/contrib/boost-cmake/CMakeLists.txt b/contrib/boost-cmake/CMakeLists.txt index cb0db5622a8..c9a759eab9c 100644 --- a/contrib/boost-cmake/CMakeLists.txt +++ b/contrib/boost-cmake/CMakeLists.txt @@ -161,21 +161,6 @@ elseif (SANITIZE STREQUAL "thread") target_compile_definitions(_boost_context PUBLIC BOOST_USE_TSAN) endif() -# fiber - -set (SRCS_FIBER - "${LIBRARY_DIR}/libs/fiber/src/context.cpp" - "${LIBRARY_DIR}/libs/fiber/src/fiber.cpp" - "${LIBRARY_DIR}/libs/fiber/src/scheduler.cpp" - "${LIBRARY_DIR}/libs/fiber/src/waker.cpp" - "${LIBRARY_DIR}/libs/fiber/src/algo/round_robin.cpp" -) - -add_library (_boost_fiber ${SRCS_FIBER}) -add_library (boost::fiber ALIAS _boost_fiber) -target_include_directories (_boost_fiber PRIVATE ${LIBRARY_DIR}) -target_link_libraries(_boost_fiber PRIVATE _boost_context) - # coroutine set (SRCS_COROUTINE diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c69ac885154..76a67ade99c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -547,9 +547,6 @@ endif () target_link_libraries(clickhouse_common_io PUBLIC boost::context) dbms_target_link_libraries(PUBLIC boost::context) -target_link_libraries(clickhouse_common_io PUBLIC boost::fiber) -dbms_target_link_libraries(PUBLIC boost::fiber) - if (ENABLE_NLP) dbms_target_link_libraries (PUBLIC ch_contrib::stemmer) dbms_target_link_libraries (PUBLIC ch_contrib::wnb) diff --git a/src/Common/AsyncTaskExecutor.cpp b/src/Common/AsyncTaskExecutor.cpp index d0c6454a849..68af535b22a 100644 --- a/src/Common/AsyncTaskExecutor.cpp +++ b/src/Common/AsyncTaskExecutor.cpp @@ -3,16 +3,16 @@ namespace DB { -thread_local const Fiber * current_fiber = nullptr; +thread_local FiberInfo current_fiber_info; AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr task_) : task(std::move(task_)) { createFiber(); } -const Fiber * AsyncTaskExecutor::getCurrentFiber() +FiberInfo AsyncTaskExecutor::getCurrentFiberInfo() { - return current_fiber; + return current_fiber_info; } void AsyncTaskExecutor::resume() @@ -38,10 +38,10 @@ void AsyncTaskExecutor::resume() void AsyncTaskExecutor::resumeUnlocked() { - const auto * parent_fiber = current_fiber; - current_fiber = &fiber; + auto parent_fiber_info = current_fiber_info; + current_fiber_info = FiberInfo{&fiber, &parent_fiber_info}; fiber = std::move(fiber).resume(); - current_fiber = parent_fiber; + current_fiber_info = parent_fiber_info; } void AsyncTaskExecutor::cancel() diff --git a/src/Common/AsyncTaskExecutor.h b/src/Common/AsyncTaskExecutor.h index cf7cdc5ad82..1c2f758504a 100644 --- a/src/Common/AsyncTaskExecutor.h +++ b/src/Common/AsyncTaskExecutor.h @@ -24,6 +24,11 @@ enum class AsyncEventTimeoutType using AsyncCallback = std::function; using ResumeCallback = std::function; +struct FiberInfo +{ + const Fiber * fiber = nullptr; + const FiberInfo * parent_fiber_info = nullptr; +}; /// Base class for a task that will be executed in a fiber. /// It has only one method - run, that takes 2 callbacks: @@ -74,7 +79,7 @@ public: ERROR = 4, }; #endif - static const Fiber * getCurrentFiber(); + static FiberInfo getCurrentFiberInfo(); protected: /// Method that is called in resume() before actual fiber resuming. @@ -119,30 +124,6 @@ private: std::unique_ptr task; }; -/// Simple class for storing fiber local variables. -template -class FiberLocalVariable -{ -public: - T & operator*() - { - return get(); - } - - T * operator->() - { - return &get(); - } - -private: - T & get() - { - return data[AsyncTaskExecutor::getCurrentFiber()]; - } - - std::unordered_map data; -}; - String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description); } diff --git a/src/Common/OpenTelemetryTraceContext.cpp b/src/Common/OpenTelemetryTraceContext.cpp index f25acc571d8..178efa33817 100644 --- a/src/Common/OpenTelemetryTraceContext.cpp +++ b/src/Common/OpenTelemetryTraceContext.cpp @@ -14,8 +14,48 @@ namespace DB namespace OpenTelemetry { -/// This code can be executed inside fiber, we should use fiber local context. -thread_local FiberLocalVariable current_fiber_trace_context; +/// This code can be executed inside several fibers in one thread, +/// we should use fiber local tracing context. +struct FiberLocalTracingContextOnThread +{ +public: + FiberLocalTracingContextOnThread() + { + /// Initialize main context for this thread. + /// Contexts for fibers will inherit this main context. + data[nullptr] = TracingContextOnThread(); + } + + TracingContextOnThread & operator*() + { + return get(); + } + + TracingContextOnThread * operator->() + { + return &get(); + } + +private: + TracingContextOnThread & get() + { + /// Get context for current fiber. + return getContextForFiber(AsyncTaskExecutor::getCurrentFiberInfo()); + } + + TracingContextOnThread & getContextForFiber(FiberInfo info) + { + auto it = data.find(info.fiber); + /// If it's the first request, we need to initialize context for the fiber using context from parent fiber. + if (it == data.end()) + it = data.insert({info.fiber, getContextForFiber(*info.parent_fiber_info)}).first; + return it->second; + } + + std::unordered_map data; +}; + +thread_local FiberLocalTracingContextOnThread current_fiber_trace_context; bool Span::addAttribute(std::string_view name, UInt64 value) noexcept {