mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Fixed undefined behaviour in ThreadPool #4572
This commit is contained in:
parent
6e852288ff
commit
89917ced9e
@ -133,18 +133,17 @@ public:
|
||||
|
||||
template <typename Function, typename... Args>
|
||||
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
|
||||
: state(std::make_shared<SharedState>(true))
|
||||
{
|
||||
mutex = std::make_shared<std::mutex>();
|
||||
|
||||
/// The function object must be copyable, so we wrap lock_guard in shared_ptr.
|
||||
/// NOTE: If this will throw an exception, the descructor won't be called.
|
||||
GlobalThreadPool::instance().scheduleOrThrow([
|
||||
mutex = mutex,
|
||||
lock = std::make_shared<std::lock_guard<std::mutex>>(*mutex),
|
||||
state = state,
|
||||
func = std::forward<Function>(func),
|
||||
args = std::make_tuple(std::forward<Args>(args)...)]
|
||||
{
|
||||
DB::ThreadStatus thread_status;
|
||||
std::apply(func, args);
|
||||
state->finish();
|
||||
});
|
||||
}
|
||||
|
||||
@ -157,7 +156,7 @@ public:
|
||||
{
|
||||
if (joinable())
|
||||
std::terminate();
|
||||
mutex = std::move(rhs.mutex);
|
||||
state = std::move(rhs.state);
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -171,26 +170,52 @@ public:
|
||||
{
|
||||
if (!joinable())
|
||||
std::terminate();
|
||||
{
|
||||
std::lock_guard lock(*mutex);
|
||||
}
|
||||
mutex.reset();
|
||||
|
||||
state->wait();
|
||||
state.reset();
|
||||
}
|
||||
|
||||
void detach()
|
||||
{
|
||||
if (!joinable())
|
||||
std::terminate();
|
||||
mutex.reset();
|
||||
state.reset();
|
||||
}
|
||||
|
||||
bool joinable() const
|
||||
{
|
||||
return static_cast<bool>(mutex);
|
||||
return state != nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::mutex> mutex; /// Object must be moveable.
|
||||
/// The state used in this object and inside the thread job.
|
||||
class SharedState
|
||||
{
|
||||
private:
|
||||
bool active;
|
||||
std::mutex mutex;
|
||||
std::condition_variable done_event;
|
||||
|
||||
public:
|
||||
SharedState(bool active) : active(active) {}
|
||||
|
||||
void finish()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
active = false;
|
||||
}
|
||||
done_event.notify_one();
|
||||
}
|
||||
|
||||
void wait()
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
done_event.wait(lock, [this]{ return !active; });
|
||||
}
|
||||
};
|
||||
|
||||
std::shared_ptr<SharedState> state;
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user