maybe fix livelock in zookeeper client

This commit is contained in:
Alexander Tokmakov 2021-08-26 18:00:27 +03:00
parent 0602d74a11
commit 7ddb4a9ccc
3 changed files with 90 additions and 70 deletions

View File

@ -2,11 +2,21 @@
#include <queue>
#include <type_traits>
#include <atomic>
#include <Poco/Mutex.h>
#include <Poco/Semaphore.h>
#include <common/MoveOrCopyIfThrow.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A very simple thread-safe queue of limited size.
* If you try to pop an item from an empty queue, the thread is blocked until the queue becomes nonempty.
@ -17,9 +27,41 @@ class ConcurrentBoundedQueue
{
private:
std::queue<T> queue;
Poco::FastMutex mutex;
mutable Poco::FastMutex mutex;
Poco::Semaphore fill_count;
Poco::Semaphore empty_count;
std::atomic_bool closed = false;
template <typename... Args>
bool tryEmplaceImpl(Args &&... args)
{
bool emplaced = true;
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (closed)
emplaced = false;
else
queue.emplace(std::forward<Args>(args)...);
}
if (emplaced)
fill_count.set();
else
empty_count.set();
return emplaced;
}
void popImpl(T & x)
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
}
public:
explicit ConcurrentBoundedQueue(size_t max_fill)
@ -30,91 +72,75 @@ public:
void push(const T & x)
{
empty_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.push(x);
}
fill_count.set();
if (!tryEmplaceImpl(x))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
}
template <typename... Args>
void emplace(Args &&... args)
{
empty_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
if (!tryEmplaceImpl(std::forward<Args>(args)...))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "tryPush/tryEmplace must be used with close()");
}
void pop(T & x)
{
fill_count.wait();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
popImpl(x);
}
bool tryPush(const T & x, UInt64 milliseconds = 0)
{
if (empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.push(x);
}
fill_count.set();
return true;
}
return false;
if (!empty_count.tryWait(milliseconds))
return false;
return tryEmplaceImpl(x);
}
template <typename... Args>
bool tryEmplace(UInt64 milliseconds, Args &&... args)
{
if (empty_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
queue.emplace(std::forward<Args>(args)...);
}
fill_count.set();
return true;
}
return false;
if (!empty_count.tryWait(milliseconds))
return false;
return tryEmplaceImpl(std::forward<Args>(args)...);
}
bool tryPop(T & x, UInt64 milliseconds = 0)
{
if (fill_count.tryWait(milliseconds))
{
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
detail::moveOrCopyIfThrow(std::move(queue.front()), x);
queue.pop();
}
empty_count.set();
return true;
}
return false;
if (!fill_count.tryWait(milliseconds))
return false;
popImpl(x);
return true;
}
size_t size()
size_t size() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.size();
}
size_t empty()
size_t empty() const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return queue.empty();
}
/// Forbids to push new elements to queue.
/// Returns false if queue was not closed before call, returns true if queue was already closed.
bool close()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
return closed.exchange(true);
}
bool isClosed() const
{
return closed.load();
}
void clear()
{
while (fill_count.tryWait(0))

View File

@ -539,7 +539,7 @@ void ZooKeeper::sendThread()
try
{
while (!expired)
while (!requests_queue.isClosed())
{
auto prev_bytes_sent = out->count();
@ -571,7 +571,7 @@ void ZooKeeper::sendThread()
info.request->has_watch = true;
}
if (expired)
if (requests_queue.isClosed())
{
break;
}
@ -616,7 +616,7 @@ void ZooKeeper::receiveThread()
try
{
Int64 waited = 0;
while (!expired)
while (!requests_queue.isClosed())
{
auto prev_bytes_received = in->count();
@ -639,7 +639,7 @@ void ZooKeeper::receiveThread()
if (in->poll(max_wait))
{
if (expired)
if (requests_queue.isClosed())
break;
receiveEvent();
@ -839,12 +839,10 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
auto expire_session_if_not_expired = [&]
{
std::lock_guard lock(push_request_mutex);
if (!expired)
{
expired = true;
/// No new requests will appear in queue after close()
bool was_already_closed = requests_queue.close();
if (!was_already_closed)
active_session_metric_increment.destroy();
}
};
try
@ -1017,17 +1015,15 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
}
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(push_request_mutex);
if (expired)
if (requests_queue.isClosed())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
{
if (requests_queue.isClosed())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
}
}
catch (...)
{

View File

@ -121,7 +121,7 @@ public:
/// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const override { return expired; }
bool isExpired() const override { return requests_queue.isClosed(); }
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
@ -205,11 +205,9 @@ private:
int64_t session_id = 0;
std::atomic<XID> next_xid {1};
std::atomic<bool> expired {false};
/// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false};
std::mutex push_request_mutex;
using clock = std::chrono::steady_clock;
@ -223,7 +221,7 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
RequestsQueue requests_queue{1024};
void pushRequest(RequestInfo && info);
using Operations = std::map<XID, RequestInfo>;