mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 01:00:48 +00:00
Fix context re-initialization for ThreadPool
This commit is contained in:
parent
cf1081eada
commit
83cbdef3c6
@ -283,6 +283,17 @@ TracingContextHolder::TracingContextHolder(
|
||||
this->root_span.start_time_us
|
||||
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
/// This object is created to initialize tracing context on a new thread,
|
||||
/// it's helpful to record the thread_id so that we know the thread switching from the span log
|
||||
try
|
||||
{
|
||||
this->root_span.addAttribute("clickhouse.thread_id", getThreadId());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// It's acceptable that this attribute is not recorded if any exception is raised
|
||||
}
|
||||
|
||||
/// set up trace context on current thread
|
||||
current_thread_trace_context = _parent_trace_context;
|
||||
current_thread_trace_context.span_id = this->root_span.span_id;
|
||||
@ -292,39 +303,32 @@ TracingContextHolder::TracingContextHolder(
|
||||
|
||||
TracingContextHolder::~TracingContextHolder()
|
||||
{
|
||||
if (this->root_span.isTraceEnabled())
|
||||
if (!this->root_span.isTraceEnabled())
|
||||
{
|
||||
try
|
||||
{
|
||||
auto shared_span_log = current_thread_trace_context.span_log.lock();
|
||||
if (shared_span_log)
|
||||
{
|
||||
try
|
||||
{
|
||||
this->root_span.addAttribute("clickhouse.thread_id", getThreadId());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Ignore any exceptions
|
||||
}
|
||||
|
||||
this->root_span.finish_time_us
|
||||
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
shared_span_log->add(OpenTelemetrySpanLogElement(this->root_span));
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__FUNCTION__);
|
||||
}
|
||||
|
||||
this->root_span.trace_id = UUID();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
auto shared_span_log = current_thread_trace_context.span_log.lock();
|
||||
if (shared_span_log)
|
||||
{
|
||||
this->root_span.finish_time_us
|
||||
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
shared_span_log->add(OpenTelemetrySpanLogElement(this->root_span));
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__FUNCTION__);
|
||||
}
|
||||
|
||||
this->root_span.trace_id = UUID();
|
||||
|
||||
if (this->is_context_owner)
|
||||
{
|
||||
// clear the context on current thread
|
||||
/// Clear the context on current thread
|
||||
current_thread_trace_context.reset();
|
||||
}
|
||||
else
|
||||
|
@ -87,7 +87,7 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
|
||||
|
||||
template <typename Thread>
|
||||
template <typename ReturnType>
|
||||
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
|
||||
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds, bool enable_tracing_context_propagation)
|
||||
{
|
||||
auto on_error = [&](const std::string & reason)
|
||||
{
|
||||
@ -150,10 +150,18 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
||||
}
|
||||
}
|
||||
|
||||
// this scheduleImpl is called in the parent thread,
|
||||
// the tracing context on this thread is used as parent context for the sub-thread that runs the job
|
||||
const auto ¤t_thread_context = DB::OpenTelemetry::CurrentContext();
|
||||
jobs.emplace(std::move(job), priority, current_thread_context);
|
||||
if (enable_tracing_context_propagation)
|
||||
{
|
||||
/// Tracing context on this thread is used as parent context for the sub-thread that runs the job
|
||||
const auto ¤t_thread_context = DB::OpenTelemetry::CurrentContext();
|
||||
jobs.emplace(std::move(job), priority, current_thread_context);
|
||||
}
|
||||
else
|
||||
{
|
||||
DB::OpenTelemetry::TracingContextOnThread empty;
|
||||
jobs.emplace(std::move(job), priority, empty);
|
||||
}
|
||||
|
||||
++scheduled_jobs;
|
||||
new_job_or_shutdown.notify_one();
|
||||
}
|
||||
@ -174,9 +182,9 @@ bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_mi
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds)
|
||||
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds, bool enable_tracing_context_propagation)
|
||||
{
|
||||
scheduleImpl<void>(std::move(job), priority, wait_microseconds);
|
||||
scheduleImpl<void>(std::move(job), priority, wait_microseconds, enable_tracing_context_propagation);
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
@ -348,7 +356,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
||||
|
||||
|
||||
template class ThreadPoolImpl<std::thread>;
|
||||
template class ThreadPoolImpl<ThreadFromGlobalPool>;
|
||||
template class ThreadPoolImpl<Thread4ThreadPool>;
|
||||
|
||||
std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
|
||||
|
||||
|
@ -56,7 +56,7 @@ public:
|
||||
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
|
||||
|
||||
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
|
||||
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
|
||||
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0, bool enable_tracing_context_propagation = true);
|
||||
|
||||
/// Wait for all currently active jobs to be done.
|
||||
/// You may call schedule and wait many times in arbitrary order.
|
||||
@ -113,7 +113,7 @@ private:
|
||||
std::exception_ptr first_exception;
|
||||
|
||||
template <typename ReturnType>
|
||||
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);
|
||||
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds, bool enable_tracing_context_propagation = true);
|
||||
|
||||
void worker(typename std::list<Thread>::iterator thread_it);
|
||||
|
||||
@ -235,7 +235,7 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
struct State
|
||||
{
|
||||
/// Should be atomic() because of possible concurrent access between
|
||||
@ -256,6 +256,72 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
/// This class is used by ThreadPool only to allocate threads in GlobalThreadPool.
|
||||
/// Any user code should use ThreadFromGlobalPool instead of this class to schedule a job in a thread.
|
||||
///
|
||||
/// The difference between this class and ThreadFromGlobalPool is that this class disables the tracing context propagation to underlying thread.
|
||||
/// If the context is propagated, not only the underlying worker will restore context but also the worker of ThreadPool.
|
||||
///
|
||||
/// Since workers of ThreadPool won't exit until the ThreadPool is destroyed, the context restored by underlying worker won't be deleted for a very long time
|
||||
/// which would cause wrong contexts for jobs for ThreadPool
|
||||
///
|
||||
class Thread4ThreadPool : public ThreadFromGlobalPool
|
||||
{
|
||||
public:
|
||||
Thread4ThreadPool() = default;
|
||||
|
||||
template <typename Function, typename... Args>
|
||||
explicit Thread4ThreadPool(Function && func, Args &&... args)
|
||||
{
|
||||
state = std::make_shared<State>();
|
||||
|
||||
/// NOTE:
|
||||
/// - If this will throw an exception, the destructor won't be called
|
||||
/// - this pointer cannot be passed in the lambda, since after detach() it will not be valid
|
||||
GlobalThreadPool::instance().scheduleOrThrow([
|
||||
state = state,
|
||||
func = std::forward<Function>(func),
|
||||
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
|
||||
{
|
||||
SCOPE_EXIT(state->event.set());
|
||||
|
||||
state->thread_id = std::this_thread::get_id();
|
||||
|
||||
/// This moves are needed to destroy function and arguments before exit.
|
||||
/// It will guarantee that after ThreadFromGlobalPool::join all captured params are destroyed.
|
||||
auto function = std::move(func);
|
||||
auto arguments = std::move(args);
|
||||
|
||||
/// Thread status holds raw pointer on query context, thus it always must be destroyed
|
||||
/// before sending signal that permits to join this thread.
|
||||
DB::ThreadStatus thread_status;
|
||||
std::apply(function, arguments);
|
||||
},
|
||||
|
||||
// default priority
|
||||
0,
|
||||
|
||||
// default wait_microseconds
|
||||
0,
|
||||
|
||||
/// Disable tracing context propagation on underlying thread pool because ThreadPool already has kept the context in its jobs.
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
Thread4ThreadPool(Thread4ThreadPool && rhs) noexcept
|
||||
{
|
||||
*this = std::move(rhs);
|
||||
}
|
||||
|
||||
Thread4ThreadPool & operator=(Thread4ThreadPool && rhs) noexcept
|
||||
{
|
||||
if (initialized())
|
||||
abort();
|
||||
state = std::move(rhs.state);
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
|
||||
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
|
||||
using ThreadPool = ThreadPoolImpl<Thread4ThreadPool>;
|
||||
|
Loading…
Reference in New Issue
Block a user