mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #14693 from hustnn/zk_exist_fix2
Fix potential memory leak caused by zookeeper exist calling
This commit is contained in:
commit
412a54e356
@ -663,7 +663,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
|
||||
{
|
||||
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
|
||||
|
||||
auto callback = [state](const Coordination::ExistsResponse & response)
|
||||
auto callback = [state](const Coordination::GetResponse & response)
|
||||
{
|
||||
state->code = int32_t(response.error);
|
||||
if (state->code)
|
||||
@ -683,8 +683,9 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
|
||||
|
||||
while (!condition || !condition())
|
||||
{
|
||||
/// NOTE: if the node doesn't exist, the watch will leak.
|
||||
impl->exists(path, callback, watch);
|
||||
/// Use getData insteand of exists to avoid watch leak.
|
||||
impl->get(path, callback, watch);
|
||||
|
||||
if (!condition)
|
||||
state->event.wait();
|
||||
else if (!state->event.tryWait(1000))
|
||||
|
@ -422,6 +422,18 @@ void ZooKeeperRequest::write(WriteBuffer & out) const
|
||||
}
|
||||
|
||||
|
||||
static void removeRootPath(String & path, const String & root_path)
|
||||
{
|
||||
if (root_path.empty())
|
||||
return;
|
||||
|
||||
if (path.size() <= root_path.size())
|
||||
throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
|
||||
|
||||
path = path.substr(root_path.size());
|
||||
}
|
||||
|
||||
|
||||
struct ZooKeeperResponse : virtual Response
|
||||
{
|
||||
virtual ~ZooKeeperResponse() override = default;
|
||||
@ -1092,8 +1104,6 @@ void ZooKeeper::sendThread()
|
||||
{
|
||||
info.request->has_watch = true;
|
||||
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
|
||||
std::lock_guard lock(watches_mutex);
|
||||
watches[info.request->getPath()].emplace_back(std::move(info.watch));
|
||||
}
|
||||
|
||||
if (expired)
|
||||
@ -1278,6 +1288,30 @@ void ZooKeeper::receiveEvent()
|
||||
response->removeRootPath(root_path);
|
||||
}
|
||||
|
||||
/// Instead of setting the watch in sendEvent, set it in receiveEvent becuase need to check the response.
|
||||
/// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes.
|
||||
/// By using getData() instead of exists(), a watch won't be set if the node doesn't exist.
|
||||
if (request_info.watch)
|
||||
{
|
||||
bool add_watch = false;
|
||||
/// 3 indicates the ZooKeeperExistsRequest.
|
||||
// For exists, we set the watch on both node exist and nonexist case.
|
||||
// For other case like getData, we only set the watch when node exists.
|
||||
if (request_info.request->getOpNum() == 3)
|
||||
add_watch = (response->error == Error::ZOK || response->error == Error::ZNONODE);
|
||||
else
|
||||
add_watch = response->error == Error::ZOK;
|
||||
|
||||
if (add_watch)
|
||||
{
|
||||
/// The key of wathces should exclude the root_path
|
||||
String req_path = request_info.request->getPath();
|
||||
removeRootPath(req_path, root_path);
|
||||
std::lock_guard lock(watches_mutex);
|
||||
watches[req_path].emplace_back(std::move(request_info.watch));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t actual_length = in->count() - count_before_event;
|
||||
if (length != actual_length)
|
||||
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), Error::ZMARSHALLINGERROR);
|
||||
|
@ -124,7 +124,8 @@ private:
|
||||
|
||||
/// Watch for the node in front of us.
|
||||
--my_node_it;
|
||||
if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback()))
|
||||
std::string get_path_value;
|
||||
if (!zookeeper.tryGetWatch(path + "/" + *my_node_it, get_path_value, nullptr, task->getWatchCallback()))
|
||||
task->schedule();
|
||||
|
||||
success = true;
|
||||
|
Loading…
Reference in New Issue
Block a user