Merge pull request #53452 from ClickHouse/such_much_watch

Deduplicate same watch callbacks registered multiple times
This commit is contained in:
Alexander Gololobov 2023-08-18 14:43:03 +02:00 committed by GitHub
commit 50061b9cb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 139 additions and 79 deletions

View File

@ -165,6 +165,10 @@ struct WatchResponse : virtual Response
};
using WatchCallback = std::function<void(const WatchResponse &)>;
/// Passing watch callback as a shared_ptr allows to
/// - avoid copying of the callback
/// - registering the same callback only once per path
using WatchCallbackPtr = std::shared_ptr<WatchCallback>;
struct SetACLRequest : virtual Request
{
@ -557,12 +561,12 @@ public:
virtual void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void get(
const String & path,
GetCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void set(
const String & path,
@ -574,7 +578,7 @@ public:
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) = 0;
WatchCallbackPtr watch) = 0;
virtual void check(
const String & path,

View File

@ -24,6 +24,9 @@ public:
static void check(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
KeeperMultiException(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
private:
KeeperMultiException(Coordination::Error code, size_t failed_op_index_, const Coordination::Requests & requests_, const Coordination::Responses & responses_);
};
size_t getFailedOpIndex(Coordination::Error code, const Coordination::Responses & responses);

View File

@ -42,9 +42,9 @@ static void processWatchesImpl(const String & path, TestKeeper::Watches & watche
auto it = watches.find(watch_response.path);
if (it != watches.end())
{
for (auto & callback : it->second)
for (const auto & callback : it->second)
if (callback)
callback(watch_response);
(*callback)(watch_response);
watches.erase(it);
}
@ -55,9 +55,9 @@ static void processWatchesImpl(const String & path, TestKeeper::Watches & watche
it = list_watches.find(watch_list_response.path);
if (it != list_watches.end())
{
for (auto & callback : it->second)
for (const auto & callback : it->second)
if (callback)
callback(watch_list_response);
(*callback)(watch_list_response);
list_watches.erase(it);
}
@ -587,11 +587,11 @@ void TestKeeper::processingThread()
? list_watches
: watches;
watches_type[info.request->getPath()].emplace_back(std::move(info.watch));
watches_type[info.request->getPath()].insert(info.watch);
}
else if (response->error == Error::ZNONODE && dynamic_cast<const ExistsRequest *>(info.request.get()))
{
watches[info.request->getPath()].emplace_back(std::move(info.watch));
watches[info.request->getPath()].insert(info.watch);
}
}
@ -634,13 +634,13 @@ void TestKeeper::finalize(const String &)
response.state = EXPIRED_SESSION;
response.error = Error::ZSESSIONEXPIRED;
for (auto & callback : path_watch.second)
for (const auto & callback : path_watch.second)
{
if (callback)
{
try
{
callback(response);
(*callback)(response);
}
catch (...)
{
@ -677,7 +677,7 @@ void TestKeeper::finalize(const String &)
response.error = Error::ZSESSIONEXPIRED;
try
{
info.watch(response);
(*info.watch)(response);
}
catch (...)
{
@ -756,7 +756,7 @@ void TestKeeper::remove(
void TestKeeper::exists(
const String & path,
ExistsCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperExistsRequest request;
request.path = path;
@ -771,7 +771,7 @@ void TestKeeper::exists(
void TestKeeper::get(
const String & path,
GetCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperGetRequest request;
request.path = path;
@ -804,7 +804,7 @@ void TestKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
TestKeeperFilteredListRequest request;
request.path = path;

View File

@ -59,12 +59,12 @@ public:
void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void set(
const String & path,
@ -76,7 +76,7 @@ public:
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void check(
const String & path,
@ -117,7 +117,7 @@ public:
using Container = std::map<std::string, Node>;
using WatchCallbacks = std::vector<WatchCallback>;
using WatchCallbacks = std::unordered_set<WatchCallbackPtr>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
private:
@ -127,7 +127,7 @@ private:
{
TestKeeperRequestPtr request;
ResponseCallback callback;
WatchCallback watch;
WatchCallbackPtr watch;
clock::time_point time;
};

View File

@ -212,7 +212,7 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type)
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
@ -250,6 +250,13 @@ Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat
return res;
}
Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallbackPtr watch_callback, Coordination::ListRequestType list_request_type)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback, list_request_type), path);
return res;
}
Coordination::Error ZooKeeper::tryGetChildren(
const std::string & path,
Strings & res,
@ -257,12 +264,9 @@ Coordination::Error ZooKeeper::tryGetChildren(
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException::fromPath(code, path);
return code;
return tryGetChildrenWatch(path, res, stat,
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
list_request_type);
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(
@ -271,6 +275,18 @@ Coordination::Error ZooKeeper::tryGetChildrenWatch(
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
return tryGetChildrenWatch(path, res, stat,
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{},
list_request_type);
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type);
@ -814,7 +830,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
do
{
/// Use getData insteand of exists to avoid watch leak.
impl->get(path, callback, watch);
impl->get(path, callback, std::make_shared<Coordination::WatchCallback>(watch));
if (!state->event.tryWait(1000))
continue;
@ -929,7 +945,8 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & p
promise->set_value(response);
};
impl->get(path, std::move(callback), watch_callback);
impl->get(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -943,7 +960,8 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::
promise->set_value(response);
};
impl->get(path, std::move(callback), watch_callback);
impl->get(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -978,7 +996,8 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::stri
promise->set_value(response);
};
impl->exists(path, std::move(callback), watch_callback);
impl->exists(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -992,7 +1011,8 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncTryExistsNoThrow(const
promise->set_value(response);
};
impl->exists(path, std::move(callback), watch_callback);
impl->exists(path, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
@ -1042,12 +1062,13 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(
promise->set_value(response);
};
impl->list(path, list_request_type, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback),
watch_callback ? std::make_shared<Coordination::WatchCallback>(watch_callback) : Coordination::WatchCallbackPtr{});
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
const std::string & path, Coordination::WatchCallbackPtr watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1243,13 +1264,18 @@ size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::
}
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperException(exception_code, "Transaction failed: Op #{}, path", failed_op_index),
requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses))
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, size_t failed_op_index_, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperException(exception_code, "Transaction failed: Op #{}, path", failed_op_index_),
requests(requests_), responses(responses_), failed_op_index(failed_op_index_)
{
addMessage(getPathForFirstFailedOp());
}
KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperMultiException(exception_code, getFailedOpIndex(exception_code, responses_), requests_, responses_)
{
}
std::string KeeperMultiException::getPathForFirstFailedOp() const
{

View File

@ -333,6 +333,11 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Strings getChildrenWatch(const std::string & path,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, false>;
using MultiTryGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, true>;
@ -369,6 +374,13 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
template <typename TIter>
MultiTryGetChildrenResponse
tryGetChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
@ -474,7 +486,7 @@ public:
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::WatchCallbackPtr watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureSet = std::future<Coordination::SetResponse>;
@ -545,7 +557,7 @@ private:
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::WatchCallbackPtr watch_callback,
Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);

View File

@ -782,9 +782,9 @@ void ZooKeeper::receiveEvent()
}
else
{
for (auto & callback : it->second)
for (const auto & callback : it->second)
if (callback)
callback(watch_response); /// NOTE We may process callbacks not under mutex.
(*callback)(watch_response); /// NOTE We may process callbacks not under mutex.
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, it->second.size());
watches.erase(it);
@ -846,13 +846,17 @@ void ZooKeeper::receiveEvent()
if (add_watch)
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
/// The key of wathces should exclude the args.chroot
String req_path = request_info.request->getPath();
removeRootPath(req_path, args.chroot);
std::lock_guard lock(watches_mutex);
watches[req_path].emplace_back(std::move(request_info.watch));
auto & callbacks = watches[req_path];
if (request_info.watch && *request_info.watch)
{
if (callbacks.insert(request_info.watch).second)
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
}
}
}
@ -1002,14 +1006,14 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
response.state = EXPIRED_SESSION;
response.error = Error::ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
for (const auto & callback : path_watches.second)
{
watch_callback_count += 1;
if (callback)
{
try
{
callback(response);
(*callback)(response);
}
catch (...)
{
@ -1054,7 +1058,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
response.error = Error::ZSESSIONEXPIRED;
try
{
info.watch(response);
(*info.watch)(response);
}
catch (...)
{
@ -1232,7 +1236,7 @@ void ZooKeeper::remove(
void ZooKeeper::exists(
const String & path,
ExistsCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
ZooKeeperExistsRequest request;
request.path = path;
@ -1250,7 +1254,7 @@ void ZooKeeper::exists(
void ZooKeeper::get(
const String & path,
GetCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
ZooKeeperGetRequest request;
request.path = path;
@ -1289,7 +1293,7 @@ void ZooKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
WatchCallbackPtr watch)
{
std::shared_ptr<ZooKeeperListRequest> request{nullptr};
if (!isFeatureEnabled(KeeperFeatureFlag::FILTERED_LIST))
@ -1310,7 +1314,8 @@ void ZooKeeper::list(
RequestInfo request_info;
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); };
request_info.watch = watch;
if (watch)
request_info.watch = std::move(watch);
request_info.request = std::move(request);
pushRequest(std::move(request_info));

View File

@ -154,12 +154,12 @@ public:
void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void set(
const String & path,
@ -171,7 +171,7 @@ public:
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;
WatchCallbackPtr watch) override;
void check(
const String & path,
@ -252,7 +252,7 @@ private:
{
ZooKeeperRequestPtr request;
ResponseCallback callback;
WatchCallback watch;
WatchCallbackPtr watch;
clock::time_point time;
};
@ -267,7 +267,7 @@ private:
Operations operations TSA_GUARDED_BY(operations_mutex);
std::mutex operations_mutex;
using WatchCallbacks = std::vector<WatchCallback>;
using WatchCallbacks = std::unordered_set<WatchCallbackPtr>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
Watches watches TSA_GUARDED_BY(watches_mutex);

View File

@ -5,6 +5,7 @@
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/typeid_cast.h>
#include <iostream>
#include <memory>
#include <base/find_symbols.h>
@ -72,13 +73,15 @@ try
//event.set();
},
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (get) on /test, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n';
});
std::make_shared<Coordination::WatchCallback>(
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (get) on /test, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n';
})
);
//event.wait();
@ -114,13 +117,15 @@ try
//event.set();
},
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (list) on /, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n';
});
std::make_shared<Coordination::WatchCallback>(
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (list) on /, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n';
})
);
//event.wait();
@ -136,13 +141,15 @@ try
//event.set();
},
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (exists) on /test, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n';
});
std::make_shared<Coordination::WatchCallback>(
[](const WatchResponse & response)
{
if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (exists) on /test, Error: " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n';
})
);
//event.wait();

View File

@ -866,7 +866,7 @@ ActiveDataPartSet getPartNamesToMutate(
}
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback)
{
std::lock_guard lock(update_mutations_mutex);

View File

@ -335,7 +335,7 @@ public:
/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallbackPtr watch_callback = {});
/// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
/// if it could not be found. Called during KILL MUTATION query execution.

View File

@ -338,6 +338,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
mutations_updating_task->deactivate();
mutations_watch_callback = std::make_shared<Coordination::WatchCallback>(mutations_updating_task->getWatchCallback());
merge_selecting_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
@ -3217,7 +3219,7 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
{
try
{
queue.updateMutations(getZooKeeper(), mutations_updating_task->getWatchCallback());
queue.updateMutations(getZooKeeper(), mutations_watch_callback);
}
catch (const Coordination::Exception & e)
{

View File

@ -497,6 +497,7 @@ private:
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
Coordination::WatchCallbackPtr mutations_watch_callback;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;