Trying to fix races one more time

This commit is contained in:
alesapin 2020-12-16 15:58:00 +03:00
parent ea4d11cb73
commit b347fa322c
2 changed files with 29 additions and 15 deletions

View File

@ -445,6 +445,8 @@ struct ContextShared
/// Stop trace collector if any
trace_collector.reset();
/// Stop zookeeper connection
zookeeper.reset();
/// Stop test_keeper storage
test_keeper_storage.reset();
}

View File

@ -419,28 +419,40 @@ Coordination::OpNum TestKeeperTCPHandler::receiveRequest()
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid;
request->readImpl(*in);
int response_fd = poll_wrapper->getResponseFD();
auto promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
zkutil::ResponseCallback callback = [response_fd, promise] (const Coordination::ZooKeeperResponsePtr & response)
if (opnum != Coordination::OpNum::Close)
{
promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &RESPONSE_BYTE, sizeof(RESPONSE_BYTE));
};
if (request->has_watch)
{
auto watch_promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
zkutil::ResponseCallback watch_callback = [response_fd, watch_promise] (const Coordination::ZooKeeperResponsePtr & response)
int response_fd = poll_wrapper->getResponseFD();
zkutil::ResponseCallback callback = [response_fd, promise] (const Coordination::ZooKeeperResponsePtr & response)
{
watch_promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &WATCH_RESPONSE_BYTE, sizeof(WATCH_RESPONSE_BYTE));
promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &RESPONSE_BYTE, sizeof(RESPONSE_BYTE));
};
test_keeper_storage->putRequest(request, session_id, callback, watch_callback);
responses.push(promise->get_future());
watch_responses.emplace_back(watch_promise->get_future());
if (request->has_watch)
{
auto watch_promise = std::make_shared<std::promise<Coordination::ZooKeeperResponsePtr>>();
zkutil::ResponseCallback watch_callback = [response_fd, watch_promise] (const Coordination::ZooKeeperResponsePtr & response)
{
watch_promise->set_value(response);
[[maybe_unused]] int result = write(response_fd, &WATCH_RESPONSE_BYTE, sizeof(WATCH_RESPONSE_BYTE));
};
test_keeper_storage->putRequest(request, session_id, callback, watch_callback);
responses.push(promise->get_future());
watch_responses.emplace_back(watch_promise->get_future());
}
else
{
test_keeper_storage->putRequest(request, session_id, callback);
responses.push(promise->get_future());
}
}
else
{
zkutil::ResponseCallback callback = [promise] (const Coordination::ZooKeeperResponsePtr & response)
{
promise->set_value(response);
};
test_keeper_storage->putRequest(request, session_id, callback);
responses.push(promise->get_future());
}