Add processLocal for read requests

This commit is contained in:
Antonio Andelic 2022-05-09 08:32:25 +00:00
parent 7f6fa9fe83
commit ad7226e151
3 changed files with 312 additions and 184 deletions

View File

@ -357,7 +357,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
{ {
/// Pure local request, just process it with storage /// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt); auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses) for (const auto & response : responses)
if (!responses_queue.push(response)) if (!responses_queue.push(response))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id);

View File

@ -446,6 +446,14 @@ struct KeeperStorageRequestProcessor
{ {
return {}; return {};
} }
// process the request using locally committed data
virtual Coordination::ZooKeeperResponsePtr
processLocal(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const
{
throw Exception{DB::ErrorCodes::LOGICAL_ERROR, "Cannot process the request locally"};
}
virtual KeeperStorage::ResponsesForSessions virtual KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const
{ {
@ -499,11 +507,11 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
auto path = zk_request->getPath(); auto path = zk_request->getPath();
auto parent_path = parentPath(path); auto parent_path = parentPath(path);
auto it = container.find(parent_path); auto node_it = container.find(parent_path);
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -587,8 +595,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr); Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
auto result = storage.commit(zxid, session_id); if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
if (result != Coordination::Error::ZOK)
{ {
response.error = result; response.error = result;
return response_ptr; return response_ptr;
@ -615,11 +622,11 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -640,28 +647,56 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
return {}; return {};
} }
Coordination::ZooKeeperResponsePtr template <bool local>
process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
{ {
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr); Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request); Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
auto it = container.find(request.path); if constexpr (local)
if (it == container.end())
{ {
response.error = Coordination::Error::ZNONODE; if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
fail();
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
} }
else else
{ {
response.stat = it->value.stat; response.stat = node_it->value.stat;
response.data = it->value.getData(); response.data = node_it->value.getData();
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
} }
return response_ptr; return response_ptr;
} }
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
}; };
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor
@ -669,11 +704,11 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(parentPath(zk_request->getPath())); auto node_it = container.find(parentPath(zk_request->getPath()));
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -740,13 +775,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr); Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
auto result = storage.commit(zxid, session_id); response.error = storage.commit(zxid, session_id);
if (result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
return response_ptr; return response_ptr;
} }
@ -772,28 +801,54 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
return {}; return {};
} }
Coordination::ZooKeeperResponsePtr template <bool local>
process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
{ {
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr); Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request); Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
auto it = container.find(request.path); if constexpr (local)
if (it != container.end())
{ {
response.stat = it->value.stat; if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
response.error = Coordination::Error::ZOK; {
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
fail();
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
} }
else else
{ {
response.error = Coordination::Error::ZNONODE; response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK;
} }
return response_ptr; return response_ptr;
} }
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
}; };
struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor
@ -801,11 +856,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -857,18 +912,17 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr); Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request); Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
auto result = storage.commit(zxid, session_id); if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
if (result != Coordination::Error::ZOK)
{ {
response.error = result; response.error = result;
return response_ptr; return response_ptr;
} }
auto it = container.find(request.path); auto node_it = container.find(request.path);
if (it == container.end()) if (node_it == container.end())
fail(); fail();
response.stat = it->value.stat; response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
return response_ptr; return response_ptr;
@ -886,11 +940,11 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -910,18 +964,36 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
return {}; return {};
} }
Coordination::ZooKeeperResponsePtr
process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
{ {
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr); Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request); Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
auto it = container.find(request.path); if constexpr (local)
if (it == container.end())
{ {
response.error = Coordination::Error::ZNONODE; if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
fail();
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
} }
else else
{ {
@ -929,18 +1001,28 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
if (path_prefix.empty()) if (path_prefix.empty())
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
const auto & children = it->value.getChildren(); const auto & children = node_it->value.getChildren();
response.names.reserve(children.size()); response.names.reserve(children.size());
for (const auto child : children) for (const auto child : children)
response.names.push_back(child.toString()); response.names.push_back(child.toString());
response.stat = it->value.stat; response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
} }
return response_ptr; return response_ptr;
} }
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
}; };
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
@ -948,11 +1030,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -976,22 +1058,39 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
return {}; return {};
} }
Coordination::ZooKeeperResponsePtr template <bool local>
process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
{ {
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr); Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request); Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
auto it = container.find(request.path);
if (it == container.end()) if constexpr (local)
{ {
response.error = Coordination::Error::ZNONODE; if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
} }
else if (request.version != -1 && request.version != it->value.stat.version) }
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{ {
response.error = Coordination::Error::ZBADVERSION; if constexpr (local)
response.error = error_code;
else
fail();
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
}
else if (request.version != -1 && request.version != node_it->value.stat.version)
{
on_error(Coordination::Error::ZBADVERSION);
} }
else else
{ {
@ -1000,6 +1099,16 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
return response_ptr; return response_ptr;
} }
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
}; };
@ -1008,11 +1117,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -1053,17 +1162,16 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr); Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr);
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
auto result = storage.commit(zxid, session_id); if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
if (result != Coordination::Error::ZOK)
{ {
response.error = result; response.error = result;
return response_ptr; return response_ptr;
} }
auto it = storage.container.find(request.path); auto node_it = storage.container.find(request.path);
if (it == storage.container.end()) if (node_it == storage.container.end())
fail(); fail();
response.stat = it->value.stat; response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
return response_ptr; return response_ptr;
@ -1075,11 +1183,11 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{ {
auto & container = storage.container; auto & container = storage.container;
auto it = container.find(zk_request->getPath()); auto node_it = container.find(zk_request->getPath());
if (it == container.end()) if (node_it == container.end())
return true; return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); const auto & node_acls = storage.acl_map.convertNumber(node_it->value.acl_id);
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -1100,26 +1208,54 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
return {}; return {};
} }
Coordination::ZooKeeperResponsePtr template <bool local>
process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
{ {
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr); Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request); Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
auto & container = storage.container;
auto it = container.find(request.path); if constexpr (local)
if (it == container.end())
{ {
response.error = Coordination::Error::ZNONODE; if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
fail();
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
} }
else else
{ {
response.stat = it->value.stat; response.stat = node_it->value.stat;
response.acl = storage.acl_map.convertNumber(it->value.acl_id); response.acl = storage.acl_map.convertNumber(node_it->value.acl_id);
} }
return response_ptr; return response_ptr;
} }
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
}; };
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
@ -1165,6 +1301,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{ {
// manually add deltas so that the result of previous request in the transaction is used in the next request
auto & saved_deltas = storage.current_nodes.deltas; auto & saved_deltas = storage.current_nodes.deltas;
std::vector<Coordination::Error> response_errors; std::vector<Coordination::Error> response_errors;
@ -1211,10 +1348,9 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
return response_ptr; return response_ptr;
} }
size_t i = 0; for (size_t i = 0; i < concrete_requests.size(); ++i)
for (const auto & concrete_request : concrete_requests)
{ {
auto cur_response = concrete_request->process(storage, zxid, session_id, time); auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time);
while (!deltas.empty()) while (!deltas.empty())
{ {
@ -1228,8 +1364,39 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
} }
response.responses[i] = cur_response; response.responses[i] = cur_response;
}
++i; response.error = Coordination::Error::ZOK;
return response_ptr;
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
{
for (size_t j = 0; j <= i; ++j)
{
auto response_error = response.responses[j]->error;
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = response_error;
}
for (size_t j = i + 1; j < response.responses.size(); ++j)
{
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
}
return response_ptr;
}
} }
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
@ -1389,7 +1556,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
int64_t session_id, int64_t session_id,
int64_t time, int64_t time,
std::optional<int64_t> new_last_zxid, std::optional<int64_t> new_last_zxid,
bool check_acl) bool check_acl,
bool is_local)
{ {
KeeperStorage::ResponsesForSessions results; KeeperStorage::ResponsesForSessions results;
if (new_last_zxid) if (new_last_zxid)
@ -1462,7 +1630,11 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
} }
else else
{ {
if (is_local)
response = request_processor->processLocal(*this, zxid, session_id, time);
else
response = request_processor->process(*this, zxid, session_id, time); response = request_processor->process(*this, zxid, session_id, time);
std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; }); std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
} }

View File

@ -1,14 +1,14 @@
#pragma once #pragma once
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/ACLMap.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <absl/container/flat_hash_set.h> #include <absl/container/flat_hash_set.h>
@ -29,7 +29,6 @@ struct KeeperStorageSnapshot;
class KeeperStorage class KeeperStorage
{ {
public: public:
struct Node struct Node
{ {
uint64_t acl_id = 0; /// 0 -- no ACL by default uint64_t acl_id = 0; /// 0 -- no ACL by default
@ -41,26 +40,18 @@ public:
Node() : size_bytes(sizeof(Node)) { } Node() : size_bytes(sizeof(Node)) { }
/// Object memory size /// Object memory size
uint64_t sizeInBytes() const uint64_t sizeInBytes() const { return size_bytes; }
{
return size_bytes;
}
void setData(String new_data); void setData(String new_data);
const auto & getData() const noexcept const auto & getData() const noexcept { return data; }
{
return data;
}
void addChild(StringRef child_path); void addChild(StringRef child_path);
void removeChild(StringRef child_path); void removeChild(StringRef child_path);
const auto & getChildren() const noexcept const auto & getChildren() const noexcept { return children; }
{
return children;
}
private: private:
String data; String data;
ChildrenSet children{}; ChildrenSet children{};
@ -85,10 +76,7 @@ public:
std::string scheme; std::string scheme;
std::string id; std::string id;
bool operator==(const AuthID & other) const bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
{
return scheme == other.scheme && id == other.id;
}
}; };
using RequestsForSessions = std::vector<RequestForSession>; using RequestsForSessions = std::vector<RequestForSession>;
@ -152,27 +140,18 @@ public:
{ {
}; };
using Operation = std::variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>; using Operation
= std::variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
struct Delta struct Delta
{ {
Delta(String path_, int64_t zxid_, Operation operation_) Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
: path(std::move(path_))
, zxid(zxid_)
, operation(std::move(operation_))
{}
Delta(int64_t zxid_, Coordination::Error error) Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
: Delta("", zxid_, ErrorDelta{error})
{}
Delta(int64_t zxid_, SubDeltaEnd subdelta) Delta(int64_t zxid_, SubDeltaEnd subdelta) : Delta("", zxid_, subdelta) { }
: Delta("", zxid_, subdelta)
{}
Delta(int64_t zxid_, FailedMultiDelta failed_multi) Delta(int64_t zxid_, FailedMultiDelta failed_multi) : Delta("", zxid_, failed_multi) { }
: Delta("", zxid_, failed_multi)
{}
String path; String path;
int64_t zxid; int64_t zxid;
@ -206,7 +185,14 @@ public:
Coordination::Error commit(int64_t zxid, int64_t session_id); Coordination::Error commit(int64_t zxid, int64_t session_id);
bool createNode(const std::string & path, String data, const Coordination::Stat & stat, bool is_sequental, bool is_ephemeral, Coordination::ACLs node_acls, int64_t session_id); bool createNode(
const std::string & path,
String data,
const Coordination::Stat & stat,
bool is_sequental,
bool is_ephemeral,
Coordination::ACLs node_acls,
int64_t session_id);
bool removeNode(const std::string & path, int32_t version); bool removeNode(const std::string & path, int32_t version);
/// Mapping session_id -> set of ephemeral nodes paths /// Mapping session_id -> set of ephemeral nodes paths
@ -233,10 +219,7 @@ public:
void clearDeadWatches(int64_t session_id); void clearDeadWatches(int64_t session_id);
/// Get current zxid /// Get current zxid
int64_t getZXID() const int64_t getZXID() const { return zxid; }
{
return zxid;
}
const String superdigest; const String superdigest;
@ -260,7 +243,13 @@ public:
/// Process user request and return response. /// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper. /// check_acl = false only when converting data from ZooKeeper.
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional<int64_t> new_last_zxid, bool check_acl = true); ResponsesForSessions processRequest(
const Coordination::ZooKeeperRequestPtr & request,
int64_t session_id,
int64_t time,
std::optional<int64_t> new_last_zxid,
bool check_acl = true,
bool is_local = false);
void preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid); void preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid);
void finalize(); void finalize();
@ -268,70 +257,37 @@ public:
/// Set of methods for creating snapshots /// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode(size_t up_to_version) void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
{
container.enableSnapshotMode(up_to_version);
}
/// Turn off snapshot mode. /// Turn off snapshot mode.
void disableSnapshotMode() void disableSnapshotMode() { container.disableSnapshotMode(); }
{
container.disableSnapshotMode();
}
Container::const_iterator getSnapshotIteratorBegin() const Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
{
return container.begin();
}
/// Clear outdated data from internal container. /// Clear outdated data from internal container.
void clearGarbageAfterSnapshot() void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
{
container.clearOutdatedNodes();
}
/// Get all active sessions /// Get all active sessions
const SessionAndTimeout & getActiveSessions() const const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
{
return session_and_timeout;
}
/// Get all dead sessions /// Get all dead sessions
std::vector<int64_t> getDeadSessions() const std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
{
return session_expiry_queue.getExpiredSessions();
}
/// Introspection functions mostly used in 4-letter commands /// Introspection functions mostly used in 4-letter commands
uint64_t getNodesCount() const uint64_t getNodesCount() const { return container.size(); }
{
return container.size();
}
uint64_t getApproximateDataSize() const uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
{
return container.getApproximateDataSize();
}
uint64_t getArenaDataSize() const uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
{
return container.keyArenaSize();
}
uint64_t getTotalWatchesCount() const; uint64_t getTotalWatchesCount() const;
uint64_t getWatchedPathsCount() const uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
{
return watches.size() + list_watches.size();
}
uint64_t getSessionsWithWatchesCount() const; uint64_t getSessionsWithWatchesCount() const;
uint64_t getSessionWithEphemeralNodesCount() const uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
{
return ephemerals.size();
}
uint64_t getTotalEphemeralNodesCount() const; uint64_t getTotalEphemeralNodesCount() const;
void dumpWatches(WriteBufferFromOwnString & buf) const; void dumpWatches(WriteBufferFromOwnString & buf) const;