Fix handler

This commit is contained in:
alesapin 2020-11-19 19:31:55 +03:00
parent 8e3f7e4dbd
commit 5925760892
2 changed files with 13 additions and 3 deletions

View File

@ -531,11 +531,19 @@ void TestKeeperStorage::processingThread()
? list_watches ? list_watches
: watches; : watches;
watches_type[info.request->zk_request->getPath()].emplace_back(std::move(info.watch_callback)); watches_type[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
} }
else if (response->error == Coordination::Error::ZNONODE && dynamic_cast<const Coordination::ZooKeeperExistsRequest *>(zk_request.get())) else if (response->error == Coordination::Error::ZNONODE && dynamic_cast<const Coordination::ZooKeeperExistsRequest *>(zk_request.get()))
{ {
watches[info.request->zk_request->getPath()].emplace_back(std::move(info.watch_callback)); watches[zk_request->getPath()].emplace_back(std::move(info.watch_callback));
}
else
{
Coordination::ZooKeeperWatchResponse watch_response;
watch_response.path = zk_request->getPath();
watch_response.xid = -1;
watch_response.error = response->error;
info.watch_callback(std::make_shared<Coordination::ZooKeeperWatchResponse>(watch_response));
} }
} }

View File

@ -120,7 +120,9 @@ void TestKeeperTCPHandler::runImpl()
{ {
if (it->wait_for(0s) == std::future_status::ready) if (it->wait_for(0s) == std::future_status::ready)
{ {
it->get()->write(*out); auto response = it->get();
if (response->error == Coordination::Error::ZOK)
response->write(*out);
it = watch_responses.erase(it); it = watch_responses.erase(it);
} }
else else