get rid of redundant code

This commit is contained in:
serxa 2023-04-13 12:11:06 +00:00
parent ce2073e58f
commit 32f78afd7e

View File

@ -335,18 +335,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
DENY_ALLOCATIONS_IN_SCOPE; DENY_ALLOCATIONS_IN_SCOPE;
CurrentMetrics::Increment metric_pool_threads(metric_threads); CurrentMetrics::Increment metric_pool_threads(metric_threads);
/// Remove this thread from `threads` and detach it, that must be done before exiting from this worker.
/// We can't wrap the following lambda function into `SCOPE_EXIT` because it requires `mutex` to be locked.
auto detach_thread = [this, thread_it]
{
/// `mutex` is supposed to be already locked.
if (threads_remove_themselves)
{
thread_it->detach();
threads.erase(thread_it);
}
};
bool job_is_done = false; bool job_is_done = false;
std::exception_ptr exception_from_job; std::exception_ptr exception_from_job;
@ -384,22 +372,17 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
job_finished.notify_all(); job_finished.notify_all();
if (shutdown) if (shutdown)
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
if (threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads))
{
/// This thread is excessive. The worker will stop.
detach_thread();
return;
}
} }
new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || (threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads)); }); new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); });
if (jobs.empty()) if (jobs.empty() || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads))
{ {
/// No jobs and either `shutdown` is set or this thread is excessive. The worker will stop. if (threads_remove_themselves)
detach_thread(); {
thread_it->detach();
threads.erase(thread_it);
}
return; return;
} }
@ -409,7 +392,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context)); parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
jobs.pop(); jobs.pop();
/// We don't run jobs after `shutdown` is set. /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown) if (shutdown)
{ {
parent_thead_trace_context.reset(); parent_thead_trace_context.reset();