More changes

This commit is contained in:
Antonio Andelic 2024-08-14 16:38:59 +02:00
parent c61fc591c4
commit e968984d17
10 changed files with 217 additions and 232 deletions

View File

@ -180,12 +180,6 @@ void SetACLRequest::addRootPath(const String & root_path) { Coordination::addRoo
void GetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SyncRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void MultiRequest::addRootPath(const String & root_path)
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void CreateResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path_created, root_path); }
void WatchResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path, root_path); }

View File

@ -391,11 +391,17 @@ struct ReconfigResponse : virtual Response
size_t bytesSize() const override { return value.size() + sizeof(stat); }
};
template <typename T>
struct MultiRequest : virtual Request
{
Requests requests;
std::vector<T> requests;
void addRootPath(const String & root_path) override
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
size_t bytesSize() const override

View File

@ -154,7 +154,7 @@ struct TestKeeperReconfigRequest final : ReconfigRequest, TestKeeperRequest
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
struct TestKeeperMultiRequest final : MultiRequest<RequestPtr>, TestKeeperRequest
{
explicit TestKeeperMultiRequest(const Requests & generic_requests)
: TestKeeperMultiRequest(std::span(generic_requests))

View File

@ -882,30 +882,30 @@ size_t ZooKeeperMultiResponse::sizeImpl() const
return total_size + Coordination::size(op_num) + Coordination::size(done) + Coordination::size(error_read);
}
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperHeartbeatResponse>()); }
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperReconfigResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared<ZooKeeperHeartbeatResponse>(); }
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return std::make_shared<ZooKeeperSyncResponse>(); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared<ZooKeeperAuthResponse>(); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared<ZooKeeperRemoveResponse>(); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared<ZooKeeperExistsResponse>(); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared<ZooKeeperGetResponse>(); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared<ZooKeeperSetResponse>(); }
ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return std::make_shared<ZooKeeperReconfigResponse>(); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared<ZooKeeperListResponse>(); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return std::make_shared<ZooKeeperSimpleListResponse>(); }
ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCreateIfNotExistsResponse>());
return setTime(std::make_shared<ZooKeeperCreateResponse>());
return std::make_shared<ZooKeeperCreateIfNotExistsResponse>();
return std::make_shared<ZooKeeperCreateResponse>();
}
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCheckNotExistsResponse>());
return std::make_shared<ZooKeeperCheckNotExistsResponse>();
return setTime(std::make_shared<ZooKeeperCheckResponse>());
return std::make_shared<ZooKeeperCheckResponse>();
}
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
@ -916,11 +916,12 @@ ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
else
response = std::make_shared<ZooKeeperMultiReadResponse>(requests);
return setTime(std::move(response));
return std::move(response);
}
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCloseResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperSetACLResponse>(); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperGetACLResponse>(); }
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
@ -1122,40 +1123,6 @@ std::shared_ptr<ZooKeeperRequest> ZooKeeperRequest::read(ReadBuffer & in)
return request;
}
ZooKeeperRequest::~ZooKeeperRequest()
{
if (!request_created_time_ns)
return;
UInt64 elapsed_ns = clock_gettime_ns() - request_created_time_ns;
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
if (max_request_time_ns < elapsed_ns)
{
LOG_TEST(getLogger(__PRETTY_FUNCTION__), "Processing of request xid={} took {} ms", xid, elapsed_ns / 1000000UL);
}
}
ZooKeeperResponsePtr ZooKeeperRequest::setTime(ZooKeeperResponsePtr response) const
{
if (request_created_time_ns)
{
response->response_created_time_ns = clock_gettime_ns();
}
return response;
}
ZooKeeperResponse::~ZooKeeperResponse()
{
if (!response_created_time_ns)
return;
UInt64 elapsed_ns = clock_gettime_ns() - response_created_time_ns;
constexpr UInt64 max_request_time_ns = 1000000000ULL; /// 1 sec
if (max_request_time_ns < elapsed_ns)
{
LOG_TEST(getLogger(__PRETTY_FUNCTION__), "Processing of response xid={} took {} ms", xid, elapsed_ns / 1000000UL);
}
}
ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const
{
auto it = op_num_to_request.find(op_num);
@ -1177,7 +1144,6 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
factory.registerRequest(num, []
{
auto res = std::make_shared<RequestT>();
res->request_created_time_ns = clock_gettime_ns();
if constexpr (num == OpNum::MultiRead)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;

View File

@ -23,11 +23,8 @@ struct ZooKeeperResponse : virtual Response
{
XID xid = 0;
UInt64 response_created_time_ns = 0;
ZooKeeperResponse() = default;
ZooKeeperResponse(const ZooKeeperResponse &) = default;
~ZooKeeperResponse() override;
virtual void readImpl(ReadBuffer &) = 0;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual size_t sizeImpl() const = 0;
@ -50,13 +47,11 @@ struct ZooKeeperRequest : virtual Request
bool restored_from_zookeeper_log = false;
UInt64 request_created_time_ns = 0;
UInt64 thread_id = 0;
String query_id;
ZooKeeperRequest() = default;
ZooKeeperRequest(const ZooKeeperRequest &) = default;
~ZooKeeperRequest() override;
virtual OpNum getOpNum() const = 0;
@ -73,7 +68,6 @@ struct ZooKeeperRequest : virtual Request
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
virtual ZooKeeperResponsePtr makeResponse() const = 0;
ZooKeeperResponsePtr setTime(ZooKeeperResponsePtr response) const;
virtual bool isReadRequest() const = 0;
virtual void createLogElements(LogElements & elems) const;
@ -538,7 +532,7 @@ struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse
size_t bytesSize() const override { return GetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeeperRequest
{
OpNum getOpNum() const override;
ZooKeeperMultiRequest() = default;
@ -571,12 +565,14 @@ private:
struct ZooKeeperMultiResponse : MultiResponse, ZooKeeperResponse
{
explicit ZooKeeperMultiResponse(const Requests & requests)
ZooKeeperMultiResponse() = default;
explicit ZooKeeperMultiResponse(const std::vector<ZooKeeperRequestPtr> & requests)
{
responses.reserve(requests.size());
for (const auto & request : requests)
responses.emplace_back(dynamic_cast<const ZooKeeperRequest &>(*request).makeResponse());
responses.emplace_back(request->makeResponse());
}
explicit ZooKeeperMultiResponse(const Responses & responses_)

View File

@ -72,7 +72,7 @@ namespace
writeBinary(node.getData(), out);
/// Serialize ACL
writeBinary(node.stats.acl_id, out);
writeBinary(node.acl_id, out);
/// Write is_sequential for backwards compatibility
if (version < SnapshotVersion::V6)
writeBinary(false, out);
@ -109,7 +109,7 @@ namespace
if (version >= SnapshotVersion::V1)
{
readBinary(node.stats.acl_id, in);
readBinary(node.acl_id, in);
}
else if (version == SnapshotVersion::V0)
{
@ -125,14 +125,14 @@ namespace
readBinary(acl.id, in);
acls.push_back(acl);
}
node.stats.acl_id = acl_map.convertACLs(acls);
node.acl_id = acl_map.convertACLs(acls);
}
/// Some strange ACLID during deserialization from ZooKeeper
if (node.stats.acl_id == std::numeric_limits<uint64_t>::max())
node.stats.acl_id = 0;
if (node.acl_id == std::numeric_limits<uint64_t>::max())
node.acl_id = 0;
acl_map.addUsage(node.stats.acl_id);
acl_map.addUsage(node.acl_id);
if (version < SnapshotVersion::V6)
{
@ -455,9 +455,12 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
{
for (const auto & itr : storage.container)
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(
parent_path, [path = itr.key](typename Storage::Node & value) { value.addChild(getBaseNodeName(path)); });
if (itr.key != "/")
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(
parent_path, [path = itr.key](typename Storage::Node & value) { value.addChild(getBaseNodeName(path)); });
}
}
for (const auto & itr : storage.container)

View File

@ -52,7 +52,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
namespace
@ -298,6 +297,7 @@ KeeperMemNode & KeeperMemNode::operator=(const KeeperMemNode & other)
return *this;
stats = other.stats;
acl_id = other.acl_id;
if (stats.data_size != 0)
{
@ -321,6 +321,7 @@ KeeperMemNode & KeeperMemNode::operator=(KeeperMemNode && other) noexcept
return *this;
stats = other.stats;
acl_id = other.acl_id;
data = std::move(other.data);
@ -403,6 +404,7 @@ UInt64 KeeperMemNode::getDigest(const std::string_view path) const
void KeeperMemNode::shallowCopy(const KeeperMemNode & other)
{
stats = other.stats;
acl_id = other.acl_id;
if (stats.data_size != 0)
{
data = std::unique_ptr<char[]>(new char[stats.data_size]);
@ -522,20 +524,26 @@ void KeeperStorage<Container>::UncommittedState::applyDelta(const Delta & delta)
chassert(!delta.path.empty());
UncommittedNode * uncommitted_node = nullptr;
std::string_view node_path;
if (auto it = nodes.find(delta.path); it != nodes.end())
{
uncommitted_node = &it->second;
node_path = it->first;
}
else
{
if (auto storage_node = tryGetNodeFromStorage(delta.path))
{
auto [emplaced_it, _] = nodes.emplace(delta.path, UncommittedNode{.node = std::move(storage_node)});
node_path = emplaced_it->first;
zxid_to_nodes[0].insert(node_path);
uncommitted_node = &emplaced_it->second;
}
else
{
auto [emplaced_it, _] = nodes.emplace(delta.path, UncommittedNode{.node = nullptr});
node_path = emplaced_it->first;
zxid_to_nodes[0].insert(node_path);
uncommitted_node = &emplaced_it->second;
}
}
@ -545,10 +553,6 @@ void KeeperStorage<Container>::UncommittedState::applyDelta(const Delta & delta)
{
auto & [node, acls, applied_zxids] = *uncommitted_node;
int64_t last_applied_zxid = 0;
if (!applied_zxids.empty())
last_applied_zxid = applied_zxids.back();
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{
chassert(!node);
@ -579,8 +583,8 @@ void KeeperStorage<Container>::UncommittedState::applyDelta(const Delta & delta)
acls = operation.new_acls;
}
if (last_applied_zxid != delta.zxid)
last_applied_zxid = applied_zxids.emplace_back(delta.zxid);
applied_zxids.insert(delta.zxid);
zxid_to_nodes[delta.zxid].insert(node_path);
},
delta.operation);
}
@ -671,8 +675,8 @@ void KeeperStorage<Container>::UncommittedState::rollbackDelta(const Delta & del
acls = operation.old_acls;
}
if (applied_zxids.back() == delta.zxid)
applied_zxids.pop_back();
applied_zxids.erase(delta.zxid);
zxid_to_nodes.erase(delta.zxid);
},
delta.operation);
}
@ -708,14 +712,21 @@ void KeeperStorage<Container>::UncommittedState::addDeltas(std::list<Delta> new_
template<typename Container>
void KeeperStorage<Container>::UncommittedState::cleanup(int64_t commit_zxid)
{
for (auto it = nodes.begin(); it != nodes.end();)
for (const auto & [transaction_zxid, transaction_nodes] : zxid_to_nodes)
{
auto & applied_zxids = it->second.applied_zxids;
std::erase_if(applied_zxids, [commit_zxid](auto current_zxid) { return current_zxid <= commit_zxid; });
if (applied_zxids.empty())
it = nodes.erase(it);
else
++it;
if (transaction_zxid > commit_zxid)
break;
std::cout << transaction_zxid << std::endl;
for (const auto node : transaction_nodes)
{
std::cout << node << std::endl;
auto node_it = nodes.find(node);
chassert(node_it != nodes.end());
node_it->second.applied_zxids.erase(transaction_zxid);
if (node_it->second.applied_zxids.empty())
nodes.erase(node_it);
}
}
for (auto it = session_and_auth.begin(); it != session_and_auth.end();)
@ -815,7 +826,10 @@ std::shared_ptr<typename Container::Node> KeeperStorage<Container>::UncommittedS
std::shared_ptr<KeeperStorage::Node> node = tryGetNodeFromStorage(path);
if (node)
nodes.emplace(std::string{path}, UncommittedNode{node});
{
auto [node_it, _] = nodes.emplace(std::string{path}, UncommittedNode{.node = node});
zxid_to_nodes[0].insert(node_it->first);
}
return node;
}
@ -826,7 +840,7 @@ Coordination::ACLs KeeperStorage<Container>::UncommittedState::getACLs(StringRef
if (auto node_it = nodes.find(path.toView()); node_it != nodes.end())
{
if (!node_it->second.acls.has_value())
node_it->second.acls.emplace(storage.acl_map.convertNumber(node_it->second.node->stats.acl_id));
node_it->second.acls.emplace(storage.acl_map.convertNumber(node_it->second.node->acl_id));
return *node_it->second.acls;
}
@ -835,8 +849,9 @@ Coordination::ACLs KeeperStorage<Container>::UncommittedState::getACLs(StringRef
if (node)
{
auto [it, inserted] = nodes.emplace(std::string{path}, UncommittedNode{node});
it->second.acls = storage.acl_map.convertNumber(node->stats.acl_id);
auto [it, inserted] = nodes.emplace(std::string{path}, UncommittedNode{.node = node});
zxid_to_nodes[0].insert(it->first);
it->second.acls = storage.acl_map.convertNumber(node->acl_id);
return *it->second.acls;
}
@ -1000,12 +1015,12 @@ Coordination::Error KeeperStorage<Container>::commit(std::list<Delta> deltas)
if (operation.version != -1 && operation.version != node_it->value.stats.aversion)
onStorageInconsistency();
acl_map.removeUsage(node_it->value.stats.acl_id);
acl_map.removeUsage(node_it->value.acl_id);
uint64_t acl_id = acl_map.convertACLs(operation.new_acls);
acl_map.addUsage(acl_id);
container.updateValue(path, [acl_id](Node & node) { node.stats.acl_id = acl_id; });
container.updateValue(path, [acl_id](Node & node) { node.acl_id = acl_id; });
return Coordination::Error::ZOK;
}
@ -1063,7 +1078,7 @@ bool KeeperStorage<Container>::createNode(
uint64_t acl_id = acl_map.convertACLs(node_acls);
acl_map.addUsage(acl_id);
created_node.stats.acl_id = acl_id;
created_node.acl_id = acl_id;
created_node.copyStats(stat);
created_node.setData(data);
@ -1105,7 +1120,7 @@ bool KeeperStorage<Container>::removeNode(const std::string & path, int32_t vers
KeeperStorage::Node prev_node;
prev_node.shallowCopy(node_it->value);
acl_map.removeUsage(node_it->value.stats.acl_id);
acl_map.removeUsage(node_it->value.acl_id);
if constexpr (use_rocksdb)
container.erase(path);
@ -1170,12 +1185,6 @@ auto callOnConcreteRequestType(const Coordination::ZooKeeperRequest & zk_request
}
}
template <typename F>
auto callOnConcreteRequestType(const Coordination::Request & request, F function)
{
return callOnConcreteRequestType(dynamic_cast<const Coordination::ZooKeeperRequest &>(request), function);
}
namespace
{
@ -1189,7 +1198,7 @@ Coordination::ACLs getNodeACLs(Storage & storage, StringRef path, bool is_local)
if (node_it == storage.container.end())
return {};
return storage.acl_map.convertNumber(node_it->value.stats.acl_id);
return storage.acl_map.convertNumber(node_it->value.acl_id);
}
return storage.uncommitted_state.getACLs(path);
@ -1306,8 +1315,8 @@ template <typename Storage>
Coordination::ZooKeeperResponsePtr
process(const Coordination::ZooKeeperSyncRequest & zk_request, Storage & /* storage */, std::list<KeeperStorageBase::Delta> /* deltas */)
{
auto response = zk_request.makeResponse();
dynamic_cast<Coordination::ZooKeeperSyncResponse &>(*response).path = zk_request.path;
auto response = std::make_shared<Coordination::ZooKeeperSyncResponse>();
response->path = zk_request.path;
return response;
}
@ -1437,14 +1446,16 @@ std::list<KeeperStorageBase::Delta> preprocess(
template <typename Storage>
Coordination::ZooKeeperResponsePtr process(const Coordination::ZooKeeperCreateRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
std::shared_ptr<Coordination::ZooKeeperCreateResponse> response = zk_request.not_exists
? std::make_shared<Coordination::ZooKeeperCreateIfNotExistsResponse>()
: std::make_shared<Coordination::ZooKeeperCreateResponse>();
if (deltas.empty())
{
response.path_created = zk_request.getPath();
response.error = Coordination::Error::ZOK;
return response_ptr;
chassert(zk_request.not_exists);
response->path_created = zk_request.getPath();
response->error = Coordination::Error::ZOK;
return response;
}
std::string created_path;
@ -1459,13 +1470,13 @@ Coordination::ZooKeeperResponsePtr process(const Coordination::ZooKeeperCreateRe
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
response.path_created = std::move(created_path);
response.error = Coordination::Error::ZOK;
return response_ptr;
response->path_created = std::move(created_path);
response->error = Coordination::Error::ZOK;
return response;
}
/// CREATE Request ///
@ -1500,25 +1511,26 @@ std::list<KeeperStorageBase::Delta> preprocess(
}
template <bool local, typename Storage>
Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperGetRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas) {
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
Coordination::ZooKeeperResponsePtr
processImpl(const Coordination::ZooKeeperGetRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
auto response = std::make_shared<Coordination::ZooKeeperGetResponse>();
if constexpr (!local)
{
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
}
if (zk_request.path == Coordination::keeper_config_path)
{
response.data = serializeClusterConfig(
response->data = serializeClusterConfig(
storage.keeper_context->getDispatcher()->getStateMachine().getClusterConfig());
response.error = Coordination::Error::ZOK;
return response_ptr;
response->error = Coordination::Error::ZOK;
return response;
}
auto & container = storage.container;
@ -1526,19 +1538,19 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperGetR
if (node_it == container.end())
{
if constexpr (local)
response.error = Coordination::Error::ZNONODE;
response->error = Coordination::Error::ZNONODE;
else
onStorageInconsistency();
}
else
{
node_it->value.setResponseStat(response.stat);
node_it->value.setResponseStat(response->stat);
auto data = node_it->value.getData();
response.data = std::string(data);
response.error = Coordination::Error::ZOK;
response->data = std::string(data);
response->error = Coordination::Error::ZOK;
}
return response_ptr;
return response;
}
template <typename Storage>
@ -1654,13 +1666,13 @@ std::list<KeeperStorageBase::Delta> preprocess(
}
template <typename Storage>
Coordination::ZooKeeperResponsePtr process(const Coordination::ZooKeeperRemoveRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
Coordination::ZooKeeperResponsePtr
process(const Coordination::ZooKeeperRemoveRequest & /*zk_request*/, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperRemoveResponse>();
response.error = storage.commit(std::move(deltas));
return response_ptr;
response->error = storage.commit(std::move(deltas));
return response;
}
/// REMOVE Request ///
@ -1691,16 +1703,17 @@ std::list<KeeperStorageBase::Delta> preprocess(
}
template <bool local, typename Storage>
Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperExistsRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas) {
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
Coordination::ZooKeeperResponsePtr
processImpl(const Coordination::ZooKeeperExistsRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
auto response = std::make_shared<Coordination::ZooKeeperExistsResponse>();
if constexpr (!local)
{
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
}
@ -1709,17 +1722,17 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperExis
if (node_it == container.end())
{
if constexpr (local)
response.error = Coordination::Error::ZNONODE;
response->error = Coordination::Error::ZNONODE;
else
onStorageInconsistency();
}
else
{
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
node_it->value.setResponseStat(response->stat);
response->error = Coordination::Error::ZOK;
}
return response_ptr;
return response;
}
template <typename Storage>
@ -1808,23 +1821,22 @@ Coordination::ZooKeeperResponsePtr process(const Coordination::ZooKeeperSetReque
{
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperSetResponse>();
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
auto node_it = container.find(zk_request.path);
if (node_it == container.end())
onStorageInconsistency();
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
node_it->value.setResponseStat(response->stat);
response->error = Coordination::Error::ZOK;
return response_ptr;
return response;
}
/// SET Request ///
@ -1856,15 +1868,14 @@ std::list<KeeperStorageBase::Delta> preprocess(
template <bool local, typename Storage>
Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperListRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperListResponse>();
if constexpr (!local)
{
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
}
@ -1874,7 +1885,7 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
if (node_it == container.end())
{
if constexpr (local)
response.error = Coordination::Error::ZNONODE;
response->error = Coordination::Error::ZNONODE;
else
onStorageInconsistency();
}
@ -1892,7 +1903,7 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
return node_it->value.getChildren();
};
const auto & children = get_children();
response.names.reserve(children.size());
response->names.reserve(children.size());
const auto add_child = [&](const auto & child)
{
@ -1929,17 +1940,17 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperList
if (add_child(child))
{
if constexpr (Storage::use_rocksdb)
response.names.push_back(child.first);
response->names.push_back(child.first);
else
response.names.push_back(child.toString());
response->names.push_back(child.toString());
}
}
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
node_it->value.setResponseStat(response->stat);
response->error = Coordination::Error::ZOK;
}
return response_ptr;
return response;
}
template <typename Storage>
@ -2002,22 +2013,23 @@ std::list<KeeperStorageBase::Delta> preprocess(
template <bool local, typename Storage>
Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperCheckRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
std::shared_ptr<Coordination::ZooKeeperCheckResponse> response = zk_request.not_exists
? std::make_shared<Coordination::ZooKeeperCheckNotExistsResponse>()
: std::make_shared<Coordination::ZooKeeperCheckResponse>();
if constexpr (!local)
{
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
response->error = error_code;
else
onStorageInconsistency();
};
@ -2030,7 +2042,7 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperChec
if (node_it != container.end() && (zk_request.version == -1 || zk_request.version == node_it->value.stats.version))
on_error(Coordination::Error::ZNODEEXISTS);
else
response.error = Coordination::Error::ZOK;
response->error = Coordination::Error::ZOK;
}
else
{
@ -2039,10 +2051,10 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperChec
else if (zk_request.version != -1 && zk_request.version != node_it->value.stats.version)
on_error(Coordination::Error::ZBADVERSION);
else
response.error = Coordination::Error::ZOK;
response->error = Coordination::Error::ZOK;
}
return response_ptr;
return response;
}
template <typename Storage>
@ -2151,8 +2163,13 @@ template <typename Storage>
Coordination::ZooKeeperResponsePtr
process(const Coordination::ZooKeeperMultiRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
std::shared_ptr<Coordination::ZooKeeperMultiResponse> response;
if (zk_request.getOpNum() == Coordination::OpNum::Multi)
response = std::make_shared<Coordination::ZooKeeperMultiWriteResponse>();
else
response = std::make_shared<Coordination::ZooKeeperMultiReadResponse>();
response->responses.reserve(zk_request.requests.size());
const auto & subrequests = zk_request.requests;
@ -2162,41 +2179,40 @@ process(const Coordination::ZooKeeperMultiRequest & zk_request, Storage & storag
{
for (size_t i = 0; i < subrequests.size(); ++i)
{
response.responses[i] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[i]->error = failed_multi->error_codes[i];
response->responses.push_back(std::make_shared<Coordination::ZooKeeperErrorResponse>());
response->responses[i]->error = failed_multi->error_codes[i];
}
return response_ptr;
return response;
}
for (size_t i = 0; i < subrequests.size(); ++i)
for (const auto & multi_subrequest : subrequests)
{
std::list<KeeperStorageBase::Delta> subdeltas = getSubdeltas(deltas);
response.responses[i] = callOnConcreteRequestType(
*subrequests[i], [&](const auto & subrequest) { return process(subrequest, storage, std::move(subdeltas)); });
response->responses.push_back(callOnConcreteRequestType(
*multi_subrequest, [&](const auto & subrequest) { return process(subrequest, storage, std::move(subdeltas)); }));
}
response.error = Coordination::Error::ZOK;
return response_ptr;
response->error = Coordination::Error::ZOK;
return response;
}
template <typename Storage>
Coordination::ZooKeeperResponsePtr processLocal(const Coordination::ZooKeeperMultiRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
ProfileEvents::increment(ProfileEvents::KeeperMultiReadRequest);
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperMultiReadResponse>();
response->responses.reserve(zk_request.requests.size());
const auto & subrequests = zk_request.requests;
for (size_t i = 0; i < subrequests.size(); ++i)
for (const auto & multi_subrequest : zk_request.requests)
{
std::list<KeeperStorageBase::Delta> subdeltas = getSubdeltas(deltas);
response.responses[i] = callOnConcreteRequestType(
*subrequests[i], [&](const auto & subrequest) { return processLocal(subrequest, storage, std::move(subdeltas)); });
response->responses.push_back(callOnConcreteRequestType(
*multi_subrequest, [&](const auto & subrequest) { return processLocal(subrequest, storage, std::move(subdeltas)); }));
}
response.error = Coordination::Error::ZOK;
return response_ptr;
response->error = Coordination::Error::ZOK;
return response;
}
template <typename Storage>
@ -2227,8 +2243,6 @@ std::list<KeeperStorageBase::Delta> preprocess(
uint64_t & /*digest*/,
const KeeperContext & /*keeper_context*/)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
if (zk_request.scheme != "digest" || std::count(zk_request.data.begin(), zk_request.data.end(), ':') != 1)
return {typename Storage::Delta{zxid, Coordination::Error::ZAUTHFAILED}};
@ -2254,15 +2268,14 @@ std::list<KeeperStorageBase::Delta> preprocess(
template <typename Storage>
Coordination::ZooKeeperResponsePtr
process(const Coordination::ZooKeeperAuthRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
process(const Coordination::ZooKeeperAuthRequest & /*zk_request*/, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast<Coordination::ZooKeeperAuthResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperAuthResponse>();
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
auth_response.error = result;
response->error = result;
return response_ptr;
return response;
}
/// AUTH Request ///
@ -2319,7 +2332,7 @@ std::list<KeeperStorageBase::Delta> preprocess(
std::list<KeeperStorageBase::Delta> new_deltas{
{zk_request.path,
zxid,
KeeperStorageBase::SetACLDelta{std::move(node_acls), uncommitted_state.getACLs(zk_request.path), zk_request.version}},
KeeperStorageBase::SetACLDelta{uncommitted_state.getACLs(zk_request.path), std::move(node_acls), zk_request.version}},
{zk_request.path, zxid, std::move(update_stat_delta)}};
digest = storage.calculateNodesDigest(digest, new_deltas);
@ -2331,22 +2344,21 @@ template <typename Storage>
Coordination::ZooKeeperResponsePtr
process(const Coordination::ZooKeeperSetACLRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperSetACLResponse>();
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
auto node_it = storage.container.find(zk_request.path);
if (node_it == storage.container.end())
onStorageInconsistency();
node_it->value.setResponseStat(response.stat);
response.error = Coordination::Error::ZOK;
node_it->value.setResponseStat(response->stat);
response->error = Coordination::Error::ZOK;
return response_ptr;
return response;
}
/// SETACL Request ///
@ -2376,15 +2388,14 @@ std::list<KeeperStorageBase::Delta> preprocess(
template <bool local, typename Storage>
Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperGetACLRequest & zk_request, Storage & storage, std::list<KeeperStorageBase::Delta> deltas)
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request.makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
auto response = std::make_shared<Coordination::ZooKeeperGetACLResponse>();
if constexpr (!local)
{
if (const auto result = storage.commit(std::move(deltas)); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
response->error = result;
return response;
}
}
@ -2393,17 +2404,17 @@ Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperGetA
if (node_it == container.end())
{
if constexpr (local)
response.error = Coordination::Error::ZNONODE;
response->error = Coordination::Error::ZNONODE;
else
onStorageInconsistency();
}
else
{
node_it->value.setResponseStat(response.stat);
response.acl = storage.acl_map.convertNumber(node_it->value.stats.acl_id);
node_it->value.setResponseStat(response->stat);
response->acl = storage.acl_map.convertNumber(node_it->value.acl_id);
}
return response_ptr;
return response;
}
template <typename Storage>
@ -2594,6 +2605,8 @@ void KeeperStorage<Container>::preprocessRequest(
uncommitted_state.applyDeltas(new_deltas);
uncommitted_state.addDeltas(std::move(new_deltas));
uncommitted_state.cleanup(getZXID());
});
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
@ -2645,8 +2658,6 @@ void KeeperStorage<Container>::preprocessRequest(
};
callOnConcreteRequestType(*zk_request, preprocess_request);
uncommitted_state.cleanup(getZXID());
}
template<typename Container>
@ -2761,6 +2772,7 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
{
Coordination::ZooKeeperResponsePtr response;
uncommitted_state.tryGetNodeFromStorage("/node_with_acl");
if (is_local)
{
chassert(zk_request->isReadRequest());
@ -2781,6 +2793,7 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
std::lock_guard lock(storage_mutex);
response = process(concrete_zk_request, *this, std::move(deltas));
}
uncommitted_state.tryGetNodeFromStorage("/node_with_acl");
/// Watches for this requests are added to the watches lists
if (zk_request->has_watch)

View File

@ -1,6 +1,7 @@
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
@ -41,8 +42,6 @@ struct NodeStats
uint32_t data_size{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default
///
void copyStats(const Coordination::Stat & stat);
bool isEphemeral() const
@ -146,6 +145,8 @@ private:
struct KeeperRocksNodeInfo
{
NodeStats stats;
uint64_t acl_id = 0; /// 0 -- no ACL by default
/// dummy interface for test
void addChild(StringRef) {}
auto getChildren() const
@ -181,6 +182,7 @@ struct KeeperRocksNode : public KeeperRocksNodeInfo
void shallowCopy(const KeeperRocksNode & other)
{
stats = other.stats;
acl_id = other.acl_id;
if (stats.data_size != 0)
{
data = std::unique_ptr<char[]>(new char[stats.data_size]);
@ -221,6 +223,8 @@ struct KeeperMemNode
std::unique_ptr<char[]> data{nullptr};
mutable uint64_t cached_digest = 0;
uint64_t acl_id = 0; /// 0 -- no ACL by default
KeeperMemNode() = default;
KeeperMemNode & operator=(const KeeperMemNode & other);
@ -505,11 +509,12 @@ public:
std::unordered_map<int64_t, std::list<std::pair<int64_t, std::shared_ptr<AuthID>>>> session_and_auth;
std::unordered_set<int64_t> closed_sessions;
using ZxidToNodes = std::map<int64_t, std::unordered_set<std::string_view>>;
struct UncommittedNode
{
std::shared_ptr<Node> node{nullptr};
std::optional<Coordination::ACLs> acls{};
std::vector<int64_t> applied_zxids{};
std::unordered_set<uint64_t> applied_zxids{};
};
struct Hash
@ -536,6 +541,7 @@ public:
};
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
mutable ZxidToNodes zxid_to_nodes;
mutable std::mutex deltas_mutex;
std::list<Delta> deltas TSA_GUARDED_BY(deltas_mutex);

View File

@ -105,7 +105,7 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
String data;
Coordination::read(data, in);
node.setData(data);
Coordination::read(node.stats.acl_id, in);
Coordination::read(node.acl_id, in);
/// Deserialize stat
Coordination::read(node.stats.czxid, in);
@ -136,7 +136,7 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
if (ephemeral_owner != 0)
storage.ephemerals[ephemeral_owner].insert(path);
storage.acl_map.addUsage(node.stats.acl_id);
storage.acl_map.addUsage(node.acl_id);
}
Coordination::read(path, in);
count++;

View File

@ -1,6 +1,7 @@
#include <chrono>
#include <gtest/gtest.h>
#include "base/defines.h"
#include "config.h"
#if USE_NURAFT
@ -1540,7 +1541,7 @@ void addNode(Storage & storage, const std::string & path, const std::string & da
using Node = typename Storage::Node;
Node node{};
node.setData(data);
node.setEphemeralOwner(ephemeral_owner);
node.stats.setEphemeralOwner(ephemeral_owner);
storage.container.insertOrReplace(path, node);
auto child_it = storage.container.find(path);
auto child_path = DB::getBaseNodeName(child_it->key);
@ -1549,7 +1550,7 @@ void addNode(Storage & storage, const std::string & path, const std::string & da
[&](auto & parent)
{
parent.addChild(child_path);
parent.increaseNumChildren();
parent.stats.increaseNumChildren();
});
}
@ -1570,7 +1571,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
@ -1601,7 +1602,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->getZXID(), 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
@ -2534,7 +2535,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
@ -2561,7 +2562,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->getZXID(), 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
@ -2755,7 +2756,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotEqual)
for (size_t j = 0; j < 3333; ++j)
storage.getSessionID(130 * j);
DB::KeeperStorageSnapshot<Storage> snapshot(&storage, storage.zxid);
DB::KeeperStorageSnapshot<Storage> snapshot(&storage, storage.getZXID());
auto buf = manager.serializeSnapshotToBuffer(snapshot);
@ -3257,7 +3258,7 @@ TYPED_TEST(CoordinationTest, TestCheckNotExistsRequest)
create_path("/test_node");
auto node_it = storage.container.find("/test_node");
ASSERT_NE(node_it, storage.container.end());
auto node_version = node_it->value.version;
auto node_version = node_it->value.stats.version;
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS");