intro new request type

This commit is contained in:
Mikhail Artemenko 2024-09-05 16:14:50 +00:00
parent 6455e1dfa1
commit c3cc2a3fb1
12 changed files with 235 additions and 100 deletions

View File

@ -171,6 +171,7 @@ bool isUserError(Error zk_return_code)
void CreateRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void RemoveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void RemoveRecursiveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void ExistsRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void GetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }

View File

@ -238,16 +238,30 @@ struct RemoveRequest : virtual Request
String path;
int32_t version = -1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
};
struct RemoveResponse : virtual Response
{
};
struct RemoveRecursiveRequest : virtual Request
{
String path;
/// strict limit for number of deleted nodes
uint32_t remove_nodes_limit = 1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version) + sizeof(remove_nodes_limit); }
size_t bytesSize() const override { return path.size() + sizeof(remove_nodes_limit); }
};
struct RemoveResponse : virtual Response
struct RemoveRecursiveResponse : virtual Response
{
};
@ -433,6 +447,7 @@ struct ErrorResponse : virtual Response
using CreateCallback = std::function<void(const CreateResponse &)>;
using RemoveCallback = std::function<void(const RemoveResponse &)>;
using RemoveRecursiveCallback = std::function<void(const RemoveRecursiveResponse &)>;
using ExistsCallback = std::function<void(const ExistsResponse &)>;
using GetCallback = std::function<void(const GetResponse &)>;
using SetCallback = std::function<void(const SetResponse &)>;
@ -588,9 +603,13 @@ public:
virtual void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) = 0;
virtual void removeRecursive(
const String & path,
uint32_t remove_nodes_limit,
RemoveRecursiveCallback callback) = 0;
virtual void exists(
const String & path,
ExistsCallback callback,

View File

@ -759,7 +759,6 @@ void TestKeeper::create(
void TestKeeper::remove(
const String & path,
int32_t version,
[[maybe_unused]] uint32_t remove_nodes_limit, // TODO(michicosun): enable
RemoveCallback callback)
{
TestKeeperRemoveRequest request;
@ -772,6 +771,14 @@ void TestKeeper::remove(
pushRequest(std::move(request_info));
}
void TestKeeper::removeRecursive(
[[maybe_unused]] const String & path,
[[maybe_unused]] uint32_t remove_nodes_limit,
[[maybe_unused]] RemoveRecursiveCallback callback)
{
/// TODO(michicosun) implement
}
void TestKeeper::exists(
const String & path,
ExistsCallback callback,

View File

@ -56,9 +56,13 @@ public:
void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) override;
void removeRecursive(
const String & path,
uint32_t remove_nodes_limit,
RemoveRecursiveCallback callback) override;
void exists(
const String & path,
ExistsCallback callback,

View File

@ -981,16 +981,16 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite
{
auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::RemoveResponse & response) mutable
auto callback = [promise](const Coordination::RemoveRecursiveResponse & response) mutable
{
promise->set_value(response);
};
impl->remove(path, -1, /*remove_nodes_limit=*/ 100, std::move(callback));
impl->removeRecursive(path, 100, std::move(callback));
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path));
@ -1385,7 +1385,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::stri
promise->set_value(response);
};
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
impl->remove(path, version, std::move(callback));
return future;
}
@ -1408,7 +1408,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::s
promise->set_value(response);
};
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
impl->remove(path, version, std::move(callback));
return future;
}
@ -1422,7 +1422,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemoveNoThrow(const
promise->set_value(response);
};
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
impl->remove(path, version, std::move(callback));
return future;
}

View File

@ -232,24 +232,24 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const
void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperRemoveRequest::writeImpl(out);
Coordination::write(path, out);
Coordination::write(remove_nodes_limit, out);
}
void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in)
void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in)
{
ZooKeeperRemoveRequest::readImpl(in);
Coordination::read(path, in);
Coordination::read(remove_nodes_limit, in);
}
std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool short_format) const
std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"{}\n"
"path = {}\n"
"remove_nodes_limit = {}",
ZooKeeperRemoveRequest::toStringImpl(short_format),
path,
remove_nodes_limit);
}
@ -531,7 +531,7 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span<const Coordination::Reque
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
}
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const ZooKeeperRemoveRecursiveRequest *>(generic_request.get()))
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const RemoveRecursiveRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRecursiveRequest>(*concrete_request_remove_recursive));

View File

@ -258,7 +258,7 @@ struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse
using ZooKeeperCreateResponse::ZooKeeperCreateResponse;
};
struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
{
ZooKeeperRemoveRequest() = default;
explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
@ -276,17 +276,7 @@ struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperRemoveRecursiveRequest : ZooKeeperRemoveRequest
{
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
};
struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
@ -295,9 +285,29 @@ struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperRemoveRecursiveResponse : ZooKeeperRemoveResponse
struct ZooKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, ZooKeeperRequest
{
ZooKeeperRemoveRecursiveRequest() = default;
explicit ZooKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {}
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return RemoveRecursiveRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperRemoveRecursiveResponse : RemoveRecursiveResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
size_t bytesSize() const override { return RemoveRecursiveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest

View File

@ -1333,33 +1333,39 @@ void ZooKeeper::create(
void ZooKeeper::remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback)
{
std::shared_ptr<ZooKeeperRemoveRequest> request = nullptr;
if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE))
{
if (remove_nodes_limit > 1)
throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server");
request = std::make_shared<ZooKeeperRemoveRequest>();
}
else
request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
request->path = path;
request->version = version;
request->remove_nodes_limit = remove_nodes_limit;
ZooKeeperRemoveRequest request;
request.path = path;
request.version = version;
RequestInfo request_info;
request_info.request = std::move(request);
request_info.request = std::make_shared<ZooKeeperRemoveRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
}
void ZooKeeper::removeRecursive(
const String &path,
uint32_t remove_nodes_limit,
RemoveRecursiveCallback callback)
{
if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE))
throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server");
ZooKeeperRemoveRecursiveRequest request;
request.path = path;
request.remove_nodes_limit = remove_nodes_limit;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperRemoveRecursiveRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveRecursiveResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
}
void ZooKeeper::exists(
const String & path,

View File

@ -144,9 +144,13 @@ public:
void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) override;
void removeRecursive(
const String &path,
uint32_t remove_nodes_limit,
RemoveRecursiveCallback callback) override;
void exists(
const String & path,
ExistsCallback callback,

View File

@ -587,7 +587,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemove(std:
promise->set_value(response);
};
keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
keeper->impl->remove(path, version, std::move(callback));
return future;
}
@ -630,7 +630,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
}
};
keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
keeper->impl->remove(path, version, std::move(callback));
return future;
}

View File

@ -86,12 +86,17 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request
break;
}
case Coordination::OpNum::Remove:
case Coordination::OpNum::RemoveRecursive:
{
Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*sub_zk_request);
memory_delta -= remove_req.bytesSize();
break;
}
case Coordination::OpNum::RemoveRecursive:
{
Coordination::ZooKeeperRemoveRecursiveRequest & remove_req = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveRequest &>(*sub_zk_request);
memory_delta -= remove_req.bytesSize();
break;
}
default:
break;
}

View File

@ -1462,16 +1462,41 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
};
namespace
{
template <typename Storage>
void addUpdateParentPzxidDelta(Storage & storage, std::vector<typename Storage::Delta> & deltas, int64_t zxid, StringRef path)
{
auto parent_path = parentNodePath(path);
if (!storage.uncommitted_state.getNode(parent_path))
return;
deltas.emplace_back(
std::string{parent_path},
zxid,
typename Storage::UpdateNodeDelta
{
[zxid](Storage::Node & parent)
{
parent.pzxid = std::max(parent.pzxid, zxid);
}
}
);
}
}
template<typename Storage>
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor<Storage>
{
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override
{
return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local);
}
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
std::vector<typename Storage::Delta>
preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
@ -1488,62 +1513,35 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
const auto update_parent_pzxid = [&]()
{
auto parent_path = parentNodePath(request.path);
if (!storage.uncommitted_state.getNode(parent_path))
return;
new_deltas.emplace_back(
std::string{parent_path},
zxid,
typename Storage::UpdateNodeDelta
{
[zxid](Storage::Node & parent)
{
parent.pzxid = std::max(parent.pzxid, zxid);
}
}
);
};
auto node = storage.uncommitted_state.getNode(request.path);
if (!node)
{
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
}
if (request.version != -1 && request.version != node->version)
else if (request.version != -1 && request.version != node->version)
return {typename Storage::Delta{zxid, Coordination::Error::ZBADVERSION}};
ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit);
bool limit_exceeded = collector.collect(request.path, *node);
if (limit_exceeded)
else if (node->numChildren() != 0)
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
auto delete_deltas = collector.extractDeltas();
new_deltas.emplace_back(
std::string{parentNodePath(request.path)},
zxid,
typename Storage::UpdateNodeDelta{[](typename Storage::Node & parent)
{
++parent.cversion;
parent.decreaseNumChildren();
}});
for (const auto & delta : delete_deltas)
std::visit(
Overloaded{
[&](const typename Storage::RemoveNodeDelta & remove_delta)
{
if (remove_delta.ephemeral_owner)
storage.unregisterEphemeralPath(remove_delta.ephemeral_owner, delta.path);
},
[](auto && /* delta */) {},
},
delta.operation);
new_deltas.emplace_back(request.path, zxid, typename Storage::RemoveNodeDelta{request.version, node->ephemeralOwner()});
new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end()));
if (node->isEphemeral())
storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path);
digest = storage.calculateNodesDigest(digest, new_deltas);
@ -1564,6 +1562,84 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
};
template<typename Storage>
struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorageRequestProcessor<Storage>
{
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override
{
return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local);
}
std::vector<typename Storage::Delta>
preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest);
Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveRequest &>(*this->zk_request);
std::vector<typename Storage::Delta> new_deltas;
if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
{
auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
handleSystemNodeModification(keeper_context, error_msg);
return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
auto node = storage.uncommitted_state.getNode(request.path);
if (!node)
{
if (request.restored_from_zookeeper_log)
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
}
ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit);
bool limit_exceeded = collector.collect(request.path, *node);
if (limit_exceeded)
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
auto delete_deltas = collector.extractDeltas();
for (const auto & delta : delete_deltas)
{
const auto * remove_delta = std::get_if<typename Storage::RemoveNodeDelta>(&delta.operation);
if (remove_delta && remove_delta->ephemeral_owner)
storage.unregisterEphemeralPath(remove_delta->ephemeral_owner, delta.path);
}
new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end()));
digest = storage.calculateNodesDigest(digest, new_deltas);
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(Storage & storage, int64_t zxid) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = this->zk_request->makeResponse();
Coordination::ZooKeeperRemoveRecursiveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveResponse &>(*response_ptr);
response.error = storage.commit(zxid);
return response_ptr;
}
KeeperStorageBase::ResponsesForSessions
processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
{
/// TODO(michicosun) rewrite
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
private:
class ToDeleteTreeCollector
@ -2270,10 +2346,13 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor<Storage>>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
case Coordination::OpNum::RemoveRecursive:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor<Storage>>(sub_zk_request));
break;
case Coordination::OpNum::RemoveRecursive:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRecursiveRequestProcessor<Storage>>(sub_zk_request));
break;
case Coordination::OpNum::Set:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor<Storage>>(sub_zk_request));
@ -2543,7 +2622,7 @@ KeeperStorageRequestProcessorsFactory<Storage>::KeeperStorageRequestProcessorsFa
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::RemoveRecursive, KeeperStorageRemoveRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::RemoveRecursive, KeeperStorageRemoveRecursiveRequestProcessor<Storage>>(*this);
}