ClickHouse/libs/libzkutil/include/zkutil/RWLock.h
2016-09-01 13:24:06 +03:00

128 lines
2.7 KiB
C++

#pragma once
#include <zkutil/ZooKeeper.h>
#include <zkutil/Common.h>
#include <DB/Common/Exception.h>
#include <Poco/Event.h>
#include <string>
#include <type_traits>
#include <functional>
namespace zkutil
{
/** Distributed read/write lock for ZooKeeper.
* Such a RWLock object may not be shared among threads.
*/
class RWLock final
{
public:
enum Type
{
Read = 0,
Write
};
enum Mode
{
Blocking = 0,
NonBlocking
};
using CancellationHook = std::function<void()>;
public:
RWLock() = default;
/// Create under the specified ZooKeeper path a queue for lock requests
/// if it doesn't exist yet.
RWLock(GetZooKeeper get_zookeeper_, const std::string & path_);
RWLock(const RWLock &) = delete;
RWLock & operator=(const RWLock &) = delete;
RWLock(RWLock &&) = default;
RWLock & operator=(RWLock &&) = default;
operator bool() const;
/// Register a function that cancels lock operations if requested.
void setCancellationHook(CancellationHook cancellation_hook_);
/// Get a read lock.
void acquireRead(RWLock::Mode mode = RWLock::Blocking);
/// Get a write lock.
void acquireWrite(RWLock::Mode mode = RWLock::Blocking);
/// Check whether we have acquired the lock.
bool ownsLock() const;
/// Release lock.
void release();
public:
template <Type, Mode = Blocking> class Guard;
private:
/// Cancel any ongoing operation if requested.
template <RWLock::Type lock_type> void acquireImpl(RWLock::Mode mode);
void abortIfRequested();
private:
/// Helper that acquires an alive ZooKeeper session.
GetZooKeeper get_zookeeper;
EventPtr event = std::make_shared<Poco::Event>();
/// Function that cancels barrier operations if requested.
CancellationHook cancellation_hook;
/// Path to the lock request queue.
std::string path;
/// Identifier of our request for a lock.
std::string key;
bool owns_lock = false;
};
/** Scoped version of RWLock.
*/
template <RWLock::Type lock_type, RWLock::Mode lock_mode>
class RWLock::Guard final
{
static_assert((lock_type == RWLock::Read) || (lock_type == RWLock::Write), "Invalid RWLock type");
static_assert((lock_mode == RWLock::Blocking) || (lock_mode == RWLock::NonBlocking), "Invalid RWLock mode");
public:
/// Acquire lock.
Guard(RWLock & rw_lock_)
: rw_lock(rw_lock_)
{
if (lock_type == RWLock::Read)
rw_lock.acquireRead(lock_mode);
else if (lock_type == RWLock::Write)
rw_lock.acquireWrite(lock_mode);
}
/// Release lock.
~Guard()
{
if (rw_lock.ownsLock())
{
try
{
rw_lock.release();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
Guard(const Guard &) = delete;
Guard & operator=(const Guard &) = delete;
private:
RWLock & rw_lock;
};
}