mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #62920 from ClickHouse/some_keeper_changes
Add some functions to zookeeper client
This commit is contained in:
commit
8a82277a03
@ -518,7 +518,8 @@ bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat,
|
||||
return code != Coordination::Error::ZNONODE;
|
||||
}
|
||||
|
||||
Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
|
||||
Coordination::Error ZooKeeper::getImpl(
|
||||
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback)
|
||||
{
|
||||
auto future_result = asyncTryGetNoThrow(path, watch_callback);
|
||||
|
||||
@ -541,6 +542,11 @@ Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & r
|
||||
}
|
||||
}
|
||||
|
||||
Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
|
||||
{
|
||||
return getImpl(path, res, stat, watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
|
||||
}
|
||||
|
||||
std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
|
||||
{
|
||||
Coordination::Error code = Coordination::Error::ZOK;
|
||||
@ -561,6 +567,17 @@ std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * s
|
||||
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
|
||||
}
|
||||
|
||||
|
||||
std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback)
|
||||
{
|
||||
Coordination::Error code = Coordination::Error::ZOK;
|
||||
std::string res;
|
||||
if (tryGetWatch(path, res, stat, watch_callback, &code))
|
||||
return res;
|
||||
else
|
||||
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
|
||||
}
|
||||
|
||||
bool ZooKeeper::tryGet(
|
||||
const std::string & path,
|
||||
std::string & res,
|
||||
@ -571,6 +588,25 @@ bool ZooKeeper::tryGet(
|
||||
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
|
||||
}
|
||||
|
||||
bool ZooKeeper::tryGetWatch(
|
||||
const std::string & path,
|
||||
std::string & res,
|
||||
Coordination::Stat * stat,
|
||||
Coordination::WatchCallbackPtr watch_callback,
|
||||
Coordination::Error * return_code)
|
||||
{
|
||||
Coordination::Error code = getImpl(path, res, stat, watch_callback);
|
||||
|
||||
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
|
||||
throw KeeperException::fromPath(code, path);
|
||||
|
||||
if (return_code)
|
||||
*return_code = code;
|
||||
|
||||
return code == Coordination::Error::ZOK;
|
||||
|
||||
}
|
||||
|
||||
bool ZooKeeper::tryGetWatch(
|
||||
const std::string & path,
|
||||
std::string & res,
|
||||
@ -589,6 +625,7 @@ bool ZooKeeper::tryGetWatch(
|
||||
return code == Coordination::Error::ZOK;
|
||||
}
|
||||
|
||||
|
||||
Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::string & data,
|
||||
int32_t version, Coordination::Stat * stat)
|
||||
{
|
||||
@ -1062,6 +1099,11 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & p
|
||||
}
|
||||
|
||||
std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
|
||||
{
|
||||
return asyncTryGetNoThrow(path, watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
|
||||
}
|
||||
|
||||
std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallbackPtr watch_callback)
|
||||
{
|
||||
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
|
||||
auto future = promise->get_future();
|
||||
@ -1071,8 +1113,7 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::
|
||||
promise->set_value(response);
|
||||
};
|
||||
|
||||
impl->get(path, std::move(callback),
|
||||
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
|
||||
impl->get(path, std::move(callback), watch_callback);
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -306,6 +306,7 @@ public:
|
||||
|
||||
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
|
||||
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
|
||||
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback);
|
||||
|
||||
using MultiGetResponse = MultiReadResponses<Coordination::GetResponse, false>;
|
||||
using MultiTryGetResponse = MultiReadResponses<Coordination::GetResponse, true>;
|
||||
@ -338,6 +339,13 @@ public:
|
||||
Coordination::WatchCallback watch_callback,
|
||||
Coordination::Error * code = nullptr);
|
||||
|
||||
bool tryGetWatch(
|
||||
const std::string & path,
|
||||
std::string & res,
|
||||
Coordination::Stat * stat,
|
||||
Coordination::WatchCallbackPtr watch_callback,
|
||||
Coordination::Error * code = nullptr);
|
||||
|
||||
template <typename TIter>
|
||||
MultiTryGetResponse tryGet(TIter start, TIter end)
|
||||
{
|
||||
@ -520,6 +528,8 @@ public:
|
||||
/// Like the previous one but don't throw any exceptions on future.get()
|
||||
FutureGet asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
|
||||
|
||||
FutureGet asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallbackPtr watch_callback = {});
|
||||
|
||||
using FutureExists = std::future<Coordination::ExistsResponse>;
|
||||
FutureExists asyncExists(const std::string & path, Coordination::WatchCallback watch_callback = {});
|
||||
/// Like the previous one but don't throw any exceptions on future.get()
|
||||
@ -625,6 +635,8 @@ private:
|
||||
Coordination::Error removeImpl(const std::string & path, int32_t version);
|
||||
Coordination::Error getImpl(
|
||||
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
|
||||
Coordination::Error getImpl(
|
||||
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback);
|
||||
Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
|
||||
Coordination::Error getChildrenImpl(
|
||||
const std::string & path,
|
||||
|
@ -22,13 +22,16 @@ ZooKeeperLock::ZooKeeperLock(
|
||||
const ZooKeeperPtr & zookeeper_,
|
||||
const std::string & lock_prefix_,
|
||||
const std::string & lock_name_,
|
||||
const std::string & lock_message_)
|
||||
const std::string & lock_message_,
|
||||
bool throw_if_lost_)
|
||||
: zookeeper(zookeeper_)
|
||||
, lock_path(fs::path(lock_prefix_) / lock_name_)
|
||||
, lock_message(lock_message_)
|
||||
, throw_if_lost(throw_if_lost_)
|
||||
, log(getLogger("zkutil::Lock"))
|
||||
{
|
||||
zookeeper->createIfNotExists(lock_prefix_, "");
|
||||
LOG_TRACE(log, "Trying to create zookeeper lock on path {} for session {}", lock_path, zookeeper->getClientID());
|
||||
}
|
||||
|
||||
ZooKeeperLock::~ZooKeeperLock()
|
||||
@ -45,7 +48,7 @@ ZooKeeperLock::~ZooKeeperLock()
|
||||
|
||||
bool ZooKeeperLock::isLocked() const
|
||||
{
|
||||
return locked;
|
||||
return locked && !zookeeper->expired();
|
||||
}
|
||||
|
||||
const std::string & ZooKeeperLock::getLockPath() const
|
||||
@ -56,7 +59,10 @@ const std::string & ZooKeeperLock::getLockPath() const
|
||||
void ZooKeeperLock::unlock()
|
||||
{
|
||||
if (!locked)
|
||||
{
|
||||
LOG_TRACE(log, "Lock on path {} for session {} is not locked, exiting", lock_path, zookeeper->getClientID());
|
||||
return;
|
||||
}
|
||||
|
||||
locked = false;
|
||||
|
||||
@ -71,12 +77,19 @@ void ZooKeeperLock::unlock()
|
||||
bool result = zookeeper->exists(lock_path, &stat);
|
||||
|
||||
if (result && stat.ephemeralOwner == zookeeper->getClientID())
|
||||
{
|
||||
zookeeper->remove(lock_path, -1);
|
||||
LOG_TRACE(log, "Lock on path {} for session {} is unlocked", lock_path, zookeeper->getClientID());
|
||||
}
|
||||
else if (result)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, it has another owner. Path: {}, message: {}, owner: {}, our id: {}",
|
||||
lock_path, lock_message, stat.ephemeralOwner, zookeeper->getClientID());
|
||||
else if (throw_if_lost)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}, our id: {}",
|
||||
lock_path, lock_message, zookeeper->getClientID());
|
||||
else
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}", lock_path, lock_message);
|
||||
LOG_INFO(log, "Lock is lost, node does not exist. Path: {}, message: {}, our id: {}",
|
||||
lock_path, lock_message, zookeeper->getClientID());
|
||||
}
|
||||
|
||||
bool ZooKeeperLock::tryLock()
|
||||
@ -96,9 +109,9 @@ bool ZooKeeperLock::tryLock()
|
||||
}
|
||||
|
||||
std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
|
||||
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
|
||||
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message, bool throw_if_lost)
|
||||
{
|
||||
return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message);
|
||||
return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message, throw_if_lost);
|
||||
}
|
||||
|
||||
|
||||
|
@ -32,7 +32,8 @@ public:
|
||||
const ZooKeeperPtr & zookeeper_,
|
||||
const std::string & lock_prefix_,
|
||||
const std::string & lock_name_,
|
||||
const std::string & lock_message_ = "");
|
||||
const std::string & lock_message_ = "",
|
||||
bool throw_if_lost_ = true);
|
||||
|
||||
~ZooKeeperLock();
|
||||
|
||||
@ -46,12 +47,13 @@ private:
|
||||
|
||||
std::string lock_path;
|
||||
std::string lock_message;
|
||||
bool throw_if_lost{true};
|
||||
LoggerPtr log;
|
||||
bool locked = false;
|
||||
|
||||
};
|
||||
|
||||
std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
|
||||
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message);
|
||||
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message, bool throw_if_lost = true);
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user