Address review comments

This commit is contained in:
Antonio Andelic 2022-06-15 12:48:30 +00:00
parent d148299c20
commit 7e1f64002d
5 changed files with 59 additions and 45 deletions

View File

@ -442,14 +442,14 @@ void KeeperDispatcher::finishSession(int64_t session_id)
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, time, request, zxid, nodes_hash] : requests_for_sessions)
for (const auto & request_for_session : requests_for_sessions)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = zxid;
auto response = request_for_session.request->makeResponse();
response->xid = request_for_session.request->xid;
response->zxid = 0;
response->error = error;
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}))
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{request_for_session.session_id, response}))
throw Exception(ErrorCodes::SYSTEM_ERROR,
"Could not push error response xid {} zxid {} error message {} to responses queue",
response->xid,

View File

@ -387,6 +387,8 @@ void KeeperServer::shutdown()
namespace
{
// Serialize the request with all the necessary information for the leader
// we don't know ZXID and digest yet so we don't serialize it
nuraft::ptr<nuraft::buffer> getZooKeeperRequestMessage(const KeeperStorage::RequestForSession & request_for_session)
{
DB::WriteBufferFromNuraftBuffer write_buf;
@ -396,6 +398,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperRequestMessage(const KeeperStorage::Requ
return write_buf.getBuffer();
}
// Serialize the request for the log entry
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestForSession & request_for_session)
{
DB::WriteBufferFromNuraftBuffer write_buf;
@ -540,6 +543,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
{
switch (type)
{
// This event is called before a single log is appended to the entry on the leader node
case nuraft::cb_func::PreAppendLog:
{
// we are relying on the fact that request are being processed under a mutex

View File

@ -190,6 +190,7 @@ void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
request_for_session.zxid,
true /* check_acl */,
request_for_session.digest);
if (digest_enabled && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
}
@ -282,6 +283,9 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(data);
// If we received a log from an older node, use the log_idx as the zxid
// log_idx will always be larger or equal to the zxid so we can safely do this
// (log_idx is increased for all logs, while zxid is only increased for requests)
if (!request_for_session.zxid)
request_for_session.zxid = log_idx;

View File

@ -159,7 +159,7 @@ KeeperStorage::ResponsesForSessions processWatchesImpl(
}
// When this function is updated, update CURRENT_DIGEST_VERSION!!
UInt64 calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat)
uint64_t calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat)
{
SipHash hash;
@ -243,7 +243,7 @@ struct Overloaded : Ts...
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNodeFromStorage(StringRef path) const
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::tryGetNodeFromStorage(StringRef path) const
{
if (auto node_it = storage.container.find(path); node_it != storage.container.end())
{
@ -261,7 +261,7 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
assert(!delta.path.empty());
if (!nodes.contains(delta.path))
{
if (auto storage_node = getNodeFromStorage(delta.path))
if (auto storage_node = tryGetNodeFromStorage(delta.path))
nodes.emplace(delta.path, UncommittedNode{.node = std::move(storage_node)});
else
nodes.emplace(delta.path, UncommittedNode{.node = nullptr});
@ -403,7 +403,7 @@ std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(St
if (auto node_it = nodes.find(std::string{path}); node_it != nodes.end())
return node_it->second.node;
return getNodeFromStorage(path);
return tryGetNodeFromStorage(path);
}
Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const
@ -733,10 +733,10 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
auto parent_path = parentPath(request.path);
auto parent_node = storage.uncommitted_state.getNode(parent_path);
if (parent_node == nullptr)
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
else if (parent_node->stat.ephemeralOwner != 0)
return {{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}};
std::string path_created = request.path;
if (request.is_sequential)
@ -751,35 +751,35 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
}
if (storage.uncommitted_state.getNode(path_created))
return {{zxid, Coordination::Error::ZNODEEXISTS}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
if (getBaseName(path_created).size == 0)
return {{zxid, Coordination::Error::ZBADARGUMENTS}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, storage.session_and_auth[session_id], node_acls))
return {{zxid, Coordination::Error::ZINVALIDACL}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZINVALIDACL}};
if (request.is_ephemeral)
storage.ephemerals[session_id].emplace(path_created);
int32_t parent_cversion = request.parent_cversion;
new_deltas.emplace_back(
std::string{parent_path},
zxid,
KeeperStorage::UpdateNodeDelta{[parent_cversion, zxid](KeeperStorage::Node & node)
{
++node.seq_num;
if (parent_cversion == -1)
++node.stat.cversion;
else if (parent_cversion > node.stat.cversion)
node.stat.cversion = parent_cversion;
auto parent_update = [parent_cversion, zxid](KeeperStorage::Node & node)
{
/// Increment sequential number even if node is not sequential
++node.seq_num;
if (parent_cversion == -1)
++node.stat.cversion;
else if (parent_cversion > node.stat.cversion)
node.stat.cversion = parent_cversion;
if (zxid > node.stat.pzxid)
node.stat.pzxid = zxid;
++node.stat.numChildren;
}});
if (zxid > node.stat.pzxid)
node.stat.pzxid = zxid;
++node.stat.numChildren;
};
new_deltas.emplace_back(std::string{parent_path}, zxid, KeeperStorage::UpdateNodeDelta{std::move(parent_update)});
Coordination::Stat stat;
stat.czxid = zxid;
@ -842,7 +842,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -935,12 +935,12 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
}
else if (request.version != -1 && request.version != node->stat.version)
return {{zxid, Coordination::Error::ZBADVERSION}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
else if (node->stat.numChildren != 0)
return {{zxid, Coordination::Error::ZNOTEMPTY}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
@ -990,7 +990,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -1056,12 +1056,12 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta> new_deltas;
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, Coordination::Error::ZBADVERSION}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
new_deltas.emplace_back(
request.path,
@ -1138,7 +1138,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -1213,11 +1213,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, Coordination::Error::ZBADVERSION}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
return {};
}
@ -1292,19 +1292,19 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
auto & uncommitted_state = storage.uncommitted_state;
if (!uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
auto node = uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.aversion)
return {{zxid, Coordination::Error::ZBADVERSION}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
auto & session_auth_ids = storage.session_and_auth[session_id];
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls))
return {{zxid, Coordination::Error::ZINVALIDACL}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZINVALIDACL}};
std::vector<KeeperStorage::Delta> new_deltas
{
@ -1365,7 +1365,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {{zxid, Coordination::Error::ZNONODE}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -1478,7 +1478,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
return {{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}};
return {KeeperStorage::Delta{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}};
}
}
new_deltas.emplace_back(zxid, KeeperStorage::SubDeltaEnd{});
@ -1589,7 +1589,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
if (auth_request.scheme != "digest" || std::count(auth_request.data.begin(), auth_request.data.end(), ':') != 1)
return {{zxid, Coordination::Error::ZAUTHFAILED}};
return {KeeperStorage::Delta{zxid, Coordination::Error::ZAUTHFAILED}};
std::vector<KeeperStorage::Delta> new_deltas;
auto auth_digest = generateDigest(auth_request.data);

View File

@ -52,9 +52,15 @@ public:
const auto & getChildren() const noexcept { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
private:
@ -240,7 +246,7 @@ public:
return false;
}
std::shared_ptr<Node> getNodeFromStorage(StringRef path) const;
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
struct UncommittedNode
{