Rewriting ZooKeeper library [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-19 20:45:30 +03:00
parent 7ca8b82ed1
commit 72ededeab5
4 changed files with 78 additions and 22 deletions

View File

@ -785,13 +785,24 @@ void ZooKeeper::receiveEvent()
auto it = watches.find(watch_response.path); auto it = watches.find(watch_response.path);
if (it == watches.end()) if (it == watches.end())
throw Exception("Received event for unknown watch"); {
/// This is Ok.
/// Because watches are identified by path.
/// And there may exist many watches for single path.
/// And watch is added to the list of watches on client side
/// slightly before than it is registered by the server.
/// And that's why new watch may be already fired by old event,
/// but then the server will actually register new watch
/// and will send event again later.
}
else
{
for (auto & callback : it->second)
if (callback)
callback(watch_response);
for (auto & callback : it->second) watches.erase(it);
if (callback) }
callback(watch_response);
watches.erase(it);
}; };
std::cerr << "Received watch\n"; std::cerr << "Received watch\n";
@ -981,8 +992,8 @@ void ZooKeeper::SetResponse::readImpl(ReadBuffer & in)
void ZooKeeper::ListResponse::readImpl(ReadBuffer & in) void ZooKeeper::ListResponse::readImpl(ReadBuffer & in)
{ {
ZooKeeperImpl::read(stat, in);
ZooKeeperImpl::read(names, in); ZooKeeperImpl::read(names, in);
ZooKeeperImpl::read(stat, in);
} }
ZooKeeper::MultiResponse::MultiResponse(const Requests & requests) ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)

View File

@ -275,8 +275,8 @@ public:
struct ListResponse final : Response struct ListResponse final : Response
{ {
Stat stat;
std::vector<String> names; std::vector<String> names;
Stat stat;
void readImpl(ReadBuffer &) override; void readImpl(ReadBuffer &) override;
}; };

View File

@ -14,7 +14,6 @@ try
} }
ZooKeeper zk(argv[1], "", 5000); ZooKeeper zk(argv[1], "", 5000);
Strings children;
std::cout << "create path" << std::endl; std::cout << "create path" << std::endl;
zk.create("/test", "old", zkutil::CreateMode::Persistent); zk.create("/test", "old", zkutil::CreateMode::Persistent);
@ -28,6 +27,12 @@ try
watch->wait(); watch->wait();
std::cout << "watch happened" << std::endl; std::cout << "watch happened" << std::endl;
std::cout << "remove path" << std::endl; std::cout << "remove path" << std::endl;
std::cout << "list path" << std::endl;
Strings children = zk.getChildren("/");
for (const auto & name : children)
std::cerr << "\t" << name << "\n";
zk.remove("/test"); zk.remove("/test");
Ops ops; Ops ops;

View File

@ -34,45 +34,85 @@ try
Strings children; Strings children;
std::cout << "create path" << '\n'; std::cout << "create\n";
zk.create("/test", "old", false, false, {}, [](const ZooKeeper::CreateResponse & response) zk.create("/test", "old", false, false, {}, [](const ZooKeeper::CreateResponse & response)
{ {
if (response.error) if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; std::cerr << "Error (create) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else else
std::cerr << "Created path: " << response.path_created << '\n'; std::cerr << "Created path: " << response.path_created << '\n';
}); });
std::cout << "get path" << '\n'; std::cout << "get\n";
zk.get("/test", zk.get("/test",
[](const ZooKeeper::GetResponse & response) [](const ZooKeeper::GetResponse & response)
{ {
if (response.error) if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; std::cerr << "Error (get) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else else
std::cerr << "Value: " << response.data << '\n'; std::cerr << "Value: " << response.data << '\n';
}, },
/* [](const ZooKeeper::WatchResponse & response) [](const ZooKeeper::WatchResponse & response)
{ {
if (response.error) if (response.error)
std::cerr << "Watch, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; std::cerr << "Watch (get) on /test, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else else
std::cerr << "Watch, path: " << response.path << ", type: " << response.type << '\n'; std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n';
}*/ {}); });
std::cout << "set path" << '\n'; std::cout << "set\n";
zk.set("/test", "new", -1, [](const ZooKeeper::SetResponse & response) zk.set("/test", "new", -1, [](const ZooKeeper::SetResponse & response)
{ {
if (response.error) if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; std::cerr << "Error (set) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else else
std::cerr << "Set\n"; std::cerr << "Set\n";
}); });
std::cout << "remove path" << '\n'; std::cout << "list\n";
zk.list("/",
[](const ZooKeeper::ListResponse & response)
{
if (response.error)
std::cerr << "Error (list) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
{
std::cerr << "Children:\n";
for (const auto & name : response.names)
std::cerr << name << "\n";
}
},
[](const ZooKeeper::WatchResponse & response)
{
if (response.error)
std::cerr << "Watch (list) on /, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n';
});
std::cout << "exists\n";
zk.exists("/test",
[](const ZooKeeper::ExistsResponse & response)
{
if (response.error)
std::cerr << "Error (exists) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Exists\n";
},
[](const ZooKeeper::WatchResponse & response)
{
if (response.error)
std::cerr << "Watch (exists) on /test, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n';
});
std::cout << "remove\n";
zk.remove("/test", -1, [](const ZooKeeper::RemoveResponse & response) zk.remove("/test", -1, [](const ZooKeeper::RemoveResponse & response)
{ {
@ -82,7 +122,7 @@ try
std::cerr << "Removed\n"; std::cerr << "Removed\n";
}); });
std::cout << "multi" << '\n'; std::cout << "multi\n";
ZooKeeper::Requests ops; ZooKeeper::Requests ops;
@ -109,7 +149,7 @@ try
zk.multi(ops, [](const ZooKeeper::MultiResponse & response) zk.multi(ops, [](const ZooKeeper::MultiResponse & response)
{ {
if (response.error) if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n'; std::cerr << "Error (multi) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else else
{ {
for (const auto & elem : response.responses) for (const auto & elem : response.responses)