add remove recursive entry point

This commit is contained in:
Mikhail Artemenko 2024-09-04 10:33:21 +00:00
parent 1f5082ee8a
commit 6c57adee7c
17 changed files with 109 additions and 20 deletions

View File

@ -238,10 +238,13 @@ struct RemoveRequest : virtual Request
String path;
int32_t version = -1;
/// strict limit for number of deleted nodes
uint32_t remove_nodes_limit = 1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
size_t bytesSize() const override { return path.size() + sizeof(version) + sizeof(remove_nodes_limit); }
};
struct RemoveResponse : virtual Response
@ -585,6 +588,7 @@ public:
virtual void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) = 0;
virtual void exists(

View File

@ -759,6 +759,7 @@ void TestKeeper::create(
void TestKeeper::remove(
const String & path,
int32_t version,
[[maybe_unused]] uint32_t remove_nodes_limit, // TODO(michicosun): enable
RemoveCallback callback)
{
TestKeeperRemoveRequest request;

View File

@ -56,6 +56,7 @@ public:
void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) override;
void exists(

View File

@ -979,16 +979,34 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
return removed_as_expected;
}
void ZooKeeper::removeRecursive(const std::string & path)
void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite
{
removeChildrenRecursive(path);
remove(path);
auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::RemoveResponse & response) mutable
{
promise->set_value(response);
};
impl->remove(path, -1, /*remove_nodes_limit=*/ 100, std::move(callback));
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path));
check(Coordination::Error::ZOPERATIONTIMEOUT, path);
}
else
{
auto response = future.get();
check(response.error, path);
}
}
void ZooKeeper::tryRemoveRecursive(const std::string & path)
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path)
{
tryRemoveChildrenRecursive(path);
tryRemove(path);
return tryRemove(path);
}
@ -1367,7 +1385,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::stri
promise->set_value(response);
};
impl->remove(path, version, std::move(callback));
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
return future;
}
@ -1390,7 +1408,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::s
promise->set_value(response);
};
impl->remove(path, version, std::move(callback));
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
return future;
}
@ -1404,7 +1422,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemoveNoThrow(const
promise->set_value(response);
};
impl->remove(path, version, std::move(callback));
impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
return future;
}

View File

@ -487,7 +487,7 @@ public:
/// this will not cause errors.
/// For instance, you can call this method twice concurrently for the same node and the end
/// result would be the same as for the single call.
void tryRemoveRecursive(const std::string & path);
Coordination::Error tryRemoveRecursive(const std::string & path);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// Node defined as RemoveException will not be deleted.

View File

@ -1,5 +1,5 @@
#include "Common/ZooKeeper/IKeeper.h"
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -232,6 +232,27 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const
{
ZooKeeperRemoveRequest::writeImpl(out);
Coordination::write(remove_nodes_limit, out);
}
void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in)
{
ZooKeeperRemoveRequest::readImpl(in);
Coordination::read(remove_nodes_limit, in);
}
std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool short_format) const
{
return fmt::format(
"{}\n"
"remove_nodes_limit = {}",
ZooKeeperRemoveRequest::toStringImpl(short_format),
remove_nodes_limit);
}
void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
@ -510,6 +531,11 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span<const Coordination::Reque
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
}
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const ZooKeeperRemoveRecursiveRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRecursiveRequest>(*concrete_request_remove_recursive));
}
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
{
checkOperationType(Write);
@ -707,6 +733,7 @@ ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return se
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveRecursiveResponse>()); }
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
@ -1024,6 +1051,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
registerZooKeeperRequest<OpNum::CheckNotExists, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::RemoveRecursive, ZooKeeperRemoveRecursiveRequest>(*this);
}
PathMatchResult matchPath(std::string_view path, std::string_view match_to)

View File

@ -258,7 +258,7 @@ struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse
using ZooKeeperCreateResponse::ZooKeeperCreateResponse;
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest
{
ZooKeeperRemoveRequest() = default;
explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
@ -276,7 +276,17 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
struct ZooKeeperRemoveRecursiveRequest : ZooKeeperRemoveRequest
{
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
};
struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
@ -285,6 +295,11 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperRemoveRecursiveResponse : ZooKeeperRemoveResponse
{
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
};
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
{
ZooKeeperExistsRequest() = default;

View File

@ -29,6 +29,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::GetACL),
static_cast<int32_t>(OpNum::FilteredList),
static_cast<int32_t>(OpNum::CheckNotExists),
static_cast<int32_t>(OpNum::RemoveRecursive),
};
OpNum getOpNum(int32_t raw_op_num)

View File

@ -40,6 +40,7 @@ enum class OpNum : int32_t
FilteredList = 500,
CheckNotExists = 501,
CreateIfNotExists = 502,
RemoveRecursive = 503,
SessionID = 997, /// Special internal request
};

View File

@ -1333,14 +1333,27 @@ void ZooKeeper::create(
void ZooKeeper::remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback)
{
ZooKeeperRemoveRequest request;
request.path = path;
request.version = version;
std::shared_ptr<ZooKeeperRemoveRequest> request = nullptr;
if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE))
{
if (remove_nodes_limit > 1)
throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server");
request = std::make_shared<ZooKeeperRemoveRequest>();
}
else
request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
request->path = path;
request->version = version;
request->remove_nodes_limit = remove_nodes_limit;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperRemoveRequest>(std::move(request));
request_info.request = std::move(request);
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
pushRequest(std::move(request_info));

View File

@ -144,6 +144,7 @@ public:
void remove(
const String & path,
int32_t version,
uint32_t remove_nodes_limit,
RemoveCallback callback) override;
void exists(

View File

@ -587,7 +587,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemove(std:
promise->set_value(response);
};
keeper->impl->remove(path, version, std::move(callback));
keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
return future;
}
@ -630,7 +630,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
}
};
keeper->impl->remove(path, version, std::move(callback));
keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback));
return future;
}

View File

@ -11,6 +11,7 @@ enum class KeeperApiVersion : uint8_t
WITH_FILTERED_LIST,
WITH_MULTI_READ,
WITH_CHECK_NOT_EXISTS,
WITH_REMOVE_RECURSIVE,
};
const String keeper_system_path = "/keeper";

View File

@ -86,6 +86,7 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request
break;
}
case Coordination::OpNum::Remove:
case Coordination::OpNum::RemoveRecursive:
{
Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*sub_zk_request);
memory_delta -= remove_req.bytesSize();

View File

@ -12,6 +12,7 @@ enum class KeeperFeatureFlag : size_t
MULTI_READ,
CHECK_NOT_EXISTS,
CREATE_IF_NOT_EXISTS,
REMOVE_RECURSIVE,
};
class KeeperFeatureFlags

View File

@ -2128,6 +2128,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor<Storage>>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
case Coordination::OpNum::RemoveRecursive:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor<Storage>>(sub_zk_request));
break;
@ -2400,6 +2401,7 @@ KeeperStorageRequestProcessorsFactory<Storage>::KeeperStorageRequestProcessorsFa
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor<Storage>>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::RemoveRecursive, KeeperStorageRemoveRequestProcessor<Storage>>(*this);
}

View File

@ -93,6 +93,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription()
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},
{"CheckNotExists", static_cast<Int16>(Coordination::OpNum::CheckNotExists)},
{"CreateIfNotExists", static_cast<Int16>(Coordination::OpNum::CreateIfNotExists)},
{"RemoveRecursive", static_cast<Int16>(Coordination::OpNum::RemoveRecursive)},
});
auto error_enum = getCoordinationErrorCodesEnumType();