mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Merge pull request #16837 from ClickHouse/correctly_send_close_request
Set expire after we sent close in ZooKeeper client
This commit is contained in:
commit
3a3860c776
@ -1114,6 +1114,7 @@ void ZooKeeper::sendThread()
|
||||
info.request->probably_sent = true;
|
||||
info.request->write(*out);
|
||||
|
||||
/// We sent close request, exit
|
||||
if (info.request->xid == close_xid)
|
||||
break;
|
||||
}
|
||||
@ -1342,21 +1343,25 @@ void ZooKeeper::receiveEvent()
|
||||
|
||||
void ZooKeeper::finalize(bool error_send, bool error_receive)
|
||||
{
|
||||
/// If some thread (send/receive) already finalizing session don't try to do it
|
||||
if (finalization_started.exchange(true))
|
||||
return;
|
||||
|
||||
auto expire_session_if_not_expired = [&]
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (expired)
|
||||
return;
|
||||
expired = true;
|
||||
}
|
||||
|
||||
active_session_metric_increment.destroy();
|
||||
if (!expired)
|
||||
{
|
||||
expired = true;
|
||||
active_session_metric_increment.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
if (!error_send)
|
||||
{
|
||||
/// Send close event. This also signals sending thread to wakeup and then stop.
|
||||
/// Send close event. This also signals sending thread to stop.
|
||||
try
|
||||
{
|
||||
close();
|
||||
@ -1364,12 +1369,18 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
|
||||
catch (...)
|
||||
{
|
||||
/// This happens for example, when "Cannot push request to queue within operation timeout".
|
||||
/// Just mark session expired in case of error on close request, otherwise sendThread may not stop.
|
||||
expire_session_if_not_expired();
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
/// Send thread will exit after sending close request or on expired flag
|
||||
send_thread.join();
|
||||
}
|
||||
|
||||
/// Set expired flag after we sent close event
|
||||
expire_session_if_not_expired();
|
||||
|
||||
try
|
||||
{
|
||||
/// This will also wakeup the receiving thread.
|
||||
|
@ -187,6 +187,9 @@ private:
|
||||
|
||||
std::atomic<XID> next_xid {1};
|
||||
std::atomic<bool> expired {false};
|
||||
/// Mark session finalization start. Used to avoid simultaneous
|
||||
/// finalization from different threads. One-shot flag.
|
||||
std::atomic<bool> finalization_started {false};
|
||||
std::mutex push_request_mutex;
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
|
Loading…
Reference in New Issue
Block a user