Merge pull request #48849 from ClickHouse/fix_zookeeper_join_race

Fix for race in ZooKeeper when joining send_thread/receive_thread
This commit is contained in:
Alexander Gololobov 2023-04-18 09:16:23 +02:00 committed by GitHub
commit 2728ce2979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 15 deletions

View File

@ -299,11 +299,8 @@ ZooKeeper::~ZooKeeper()
{
finalize(false, false, "Destructor called");
if (send_thread.joinable())
send_thread.join();
if (receive_thread.joinable())
receive_thread.join();
send_thread.join();
receive_thread.join();
}
catch (...)
{
@ -365,11 +362,8 @@ ZooKeeper::ZooKeeper(
{
tryLogCurrentException(log, "Failed to connect to ZooKeeper");
if (send_thread.joinable())
send_thread.join();
if (receive_thread.joinable())
receive_thread.join();
send_thread.join();
receive_thread.join();
throw;
}
@ -914,8 +908,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
}
/// Send thread will exit after sending close request or on expired flag
if (send_thread.joinable())
send_thread.join();
send_thread.join();
}
/// Set expired flag after we sent close event
@ -932,7 +925,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
tryLogCurrentException(log);
}
if (!error_receive && receive_thread.joinable())
if (!error_receive)
receive_thread.join();
{

View File

@ -255,8 +255,30 @@ private:
Watches watches TSA_GUARDED_BY(watches_mutex);
std::mutex watches_mutex;
ThreadFromGlobalPool send_thread;
ThreadFromGlobalPool receive_thread;
/// A wrapper around ThreadFromGlobalPool that allows to call join() on it from multiple threads.
class ThreadReference
{
public:
const ThreadReference & operator = (ThreadFromGlobalPool && thread_)
{
std::lock_guard<std::mutex> l(lock);
thread = std::move(thread_);
return *this;
}
void join()
{
std::lock_guard<std::mutex> l(lock);
if (thread.joinable())
thread.join();
}
private:
std::mutex lock;
ThreadFromGlobalPool thread;
};
ThreadReference send_thread;
ThreadReference receive_thread;
Poco::Logger * log;