mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #69332 from ClickHouse/issues/68932/remove-recursive
Support removeRecursive natively in keeper
This commit is contained in:
commit
f62cb32c5c
@ -55,7 +55,7 @@ keeper foo bar
|
||||
- `touch '<path>'` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
|
||||
- `get '<path>'` -- Returns the node's value
|
||||
- `rm '<path>' [version]` -- Removes the node only if version matches (default: -1)
|
||||
- `rmr '<path>'` -- Recursively deletes path. Confirmation required
|
||||
- `rmr '<path>' [limit]` -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100)
|
||||
- `flwc <command>` -- Executes four-letter-word command
|
||||
- `help` -- Prints this message
|
||||
- `get_direct_children_number '[path]'` -- Get numbers of direct children nodes under a specific path
|
||||
|
@ -506,14 +506,23 @@ bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & nod
|
||||
return false;
|
||||
node->args.push_back(std::move(path));
|
||||
|
||||
ASTPtr remove_nodes_limit;
|
||||
if (ParserUnsignedInteger{}.parse(pos, remove_nodes_limit, expected))
|
||||
node->args.push_back(remove_nodes_limit->as<ASTLiteral &>().value);
|
||||
else
|
||||
node->args.push_back(UInt64(100));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
|
||||
{
|
||||
String path = client->getAbsolutePath(query->args[0].safeGet<String>());
|
||||
UInt64 remove_nodes_limit = query->args[1].safeGet<UInt64>();
|
||||
|
||||
client->askConfirmation(
|
||||
"You are going to recursively delete path " + path, [client, path] { client->zookeeper->removeRecursive(path); });
|
||||
"You are going to recursively delete path " + path,
|
||||
[client, path, remove_nodes_limit] { client->zookeeper->removeRecursive(path, static_cast<UInt32>(remove_nodes_limit)); });
|
||||
}
|
||||
|
||||
bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
|
||||
|
@ -184,7 +184,7 @@ class RMRCommand : public IKeeperClientCommand
|
||||
|
||||
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
|
||||
|
||||
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
|
||||
String getHelpMessage() const override { return "{} <path> [limit] -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100)"; }
|
||||
};
|
||||
|
||||
class ReconfigCommand : public IKeeperClientCommand
|
||||
|
@ -171,6 +171,7 @@ bool isUserError(Error zk_return_code)
|
||||
|
||||
void CreateRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
void RemoveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
void RemoveRecursiveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
void ExistsRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
void GetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
void SetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
|
||||
|
@ -248,6 +248,23 @@ struct RemoveResponse : virtual Response
|
||||
{
|
||||
};
|
||||
|
||||
struct RemoveRecursiveRequest : virtual Request
|
||||
{
|
||||
String path;
|
||||
|
||||
/// strict limit for number of deleted nodes
|
||||
uint32_t remove_nodes_limit = 1;
|
||||
|
||||
void addRootPath(const String & root_path) override;
|
||||
String getPath() const override { return path; }
|
||||
|
||||
size_t bytesSize() const override { return path.size() + sizeof(remove_nodes_limit); }
|
||||
};
|
||||
|
||||
struct RemoveRecursiveResponse : virtual Response
|
||||
{
|
||||
};
|
||||
|
||||
struct ExistsRequest : virtual Request
|
||||
{
|
||||
String path;
|
||||
@ -430,6 +447,7 @@ struct ErrorResponse : virtual Response
|
||||
|
||||
using CreateCallback = std::function<void(const CreateResponse &)>;
|
||||
using RemoveCallback = std::function<void(const RemoveResponse &)>;
|
||||
using RemoveRecursiveCallback = std::function<void(const RemoveRecursiveResponse &)>;
|
||||
using ExistsCallback = std::function<void(const ExistsResponse &)>;
|
||||
using GetCallback = std::function<void(const GetResponse &)>;
|
||||
using SetCallback = std::function<void(const SetResponse &)>;
|
||||
@ -587,6 +605,11 @@ public:
|
||||
int32_t version,
|
||||
RemoveCallback callback) = 0;
|
||||
|
||||
virtual void removeRecursive(
|
||||
const String & path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback) = 0;
|
||||
|
||||
virtual void exists(
|
||||
const String & path,
|
||||
ExistsCallback callback,
|
||||
|
@ -90,6 +90,36 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
|
||||
}
|
||||
};
|
||||
|
||||
struct TestKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, TestKeeperRequest
|
||||
{
|
||||
TestKeeperRemoveRecursiveRequest() = default;
|
||||
explicit TestKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {}
|
||||
ResponsePtr createResponse() const override;
|
||||
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||
|
||||
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||
{
|
||||
std::vector<std::pair<String, size_t>> deleted;
|
||||
|
||||
auto add_deleted_watches = [&](TestKeeper::Watches & w)
|
||||
{
|
||||
for (const auto & [watch_path, _] : w)
|
||||
if (watch_path.starts_with(path))
|
||||
deleted.emplace_back(watch_path, std::count(watch_path.begin(), watch_path.end(), '/'));
|
||||
};
|
||||
|
||||
add_deleted_watches(node_watches);
|
||||
add_deleted_watches(list_watches);
|
||||
std::sort(deleted.begin(), deleted.end(), [](const auto & lhs, const auto & rhs)
|
||||
{
|
||||
return lhs.second < rhs.second;
|
||||
});
|
||||
|
||||
for (const auto & [watch_path, _] : deleted)
|
||||
processWatchesImpl(watch_path, node_watches, list_watches);
|
||||
}
|
||||
};
|
||||
|
||||
struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
|
||||
{
|
||||
ResponsePtr createResponse() const override;
|
||||
@ -175,6 +205,10 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove));
|
||||
}
|
||||
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const RemoveRecursiveRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperRemoveRecursiveRequest>(*concrete_request_remove_recursive));
|
||||
}
|
||||
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set));
|
||||
@ -313,6 +347,62 @@ std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Contai
|
||||
return { std::make_shared<RemoveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::pair<ResponsePtr, Undo> TestKeeperRemoveRecursiveRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||
{
|
||||
RemoveRecursiveResponse response;
|
||||
response.zxid = zxid;
|
||||
Undo undo;
|
||||
|
||||
auto root_it = container.find(path);
|
||||
if (root_it == container.end())
|
||||
{
|
||||
response.error = Error::ZNONODE;
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::vector<std::pair<std::string, Coordination::TestKeeper::Node>> children;
|
||||
|
||||
for (auto it = std::next(root_it); it != container.end(); ++it)
|
||||
{
|
||||
const auto & [child_path, child_node] = *it;
|
||||
|
||||
if (child_path.starts_with(path))
|
||||
children.emplace_back(child_path, child_node);
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
if (children.size() > remove_nodes_limit)
|
||||
{
|
||||
response.error = Error::ZNOTEMPTY;
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
auto & parent = container.at(parentPath(path));
|
||||
--parent.stat.numChildren;
|
||||
++parent.stat.cversion;
|
||||
|
||||
for (const auto & [child_path, child_node] : children)
|
||||
{
|
||||
auto child_it = container.find(child_path);
|
||||
chassert(child_it != container.end());
|
||||
container.erase(child_it);
|
||||
}
|
||||
|
||||
response.error = Error::ZOK;
|
||||
undo = [&container, dead = std::move(children), root_path = path]()
|
||||
{
|
||||
for (auto && [child_path, child_node] : dead)
|
||||
container.emplace(child_path, child_node);
|
||||
|
||||
auto & undo_parent = container.at(parentPath(root_path));
|
||||
++undo_parent.stat.numChildren;
|
||||
--undo_parent.stat.cversion;
|
||||
};
|
||||
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||
{
|
||||
ExistsResponse response;
|
||||
@ -530,6 +620,7 @@ std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Contain
|
||||
|
||||
ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); }
|
||||
ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); }
|
||||
ResponsePtr TestKeeperRemoveRecursiveRequest::createResponse() const { return std::make_shared<RemoveRecursiveResponse>(); }
|
||||
ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); }
|
||||
ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); }
|
||||
ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
|
||||
@ -771,6 +862,21 @@ void TestKeeper::remove(
|
||||
pushRequest(std::move(request_info));
|
||||
}
|
||||
|
||||
void TestKeeper::removeRecursive(
|
||||
const String & path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback)
|
||||
{
|
||||
TestKeeperRemoveRecursiveRequest request;
|
||||
request.path = path;
|
||||
request.remove_nodes_limit = remove_nodes_limit;
|
||||
|
||||
RequestInfo request_info;
|
||||
request_info.request = std::make_shared<TestKeeperRemoveRecursiveRequest>(std::move(request));
|
||||
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveRecursiveResponse &>(response)); };
|
||||
pushRequest(std::move(request_info));
|
||||
}
|
||||
|
||||
void TestKeeper::exists(
|
||||
const String & path,
|
||||
ExistsCallback callback,
|
||||
|
@ -58,6 +58,11 @@ public:
|
||||
int32_t version,
|
||||
RemoveCallback callback) override;
|
||||
|
||||
void removeRecursive(
|
||||
const String & path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback) override;
|
||||
|
||||
void exists(
|
||||
const String & path,
|
||||
ExistsCallback callback,
|
||||
|
@ -31,6 +31,7 @@ using AsyncResponses = std::vector<std::pair<std::string, std::future<R>>>;
|
||||
|
||||
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists = false);
|
||||
Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version);
|
||||
Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit);
|
||||
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
|
||||
Coordination::RequestPtr makeCheckRequest(const std::string & path, int version);
|
||||
|
||||
|
@ -979,18 +979,47 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
|
||||
return removed_as_expected;
|
||||
}
|
||||
|
||||
void ZooKeeper::removeRecursive(const std::string & path)
|
||||
void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_limit)
|
||||
{
|
||||
removeChildrenRecursive(path);
|
||||
remove(path);
|
||||
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
|
||||
{
|
||||
removeChildrenRecursive(path);
|
||||
remove(path);
|
||||
return;
|
||||
}
|
||||
|
||||
check(tryRemoveRecursive(path, remove_nodes_limit), path);
|
||||
}
|
||||
|
||||
void ZooKeeper::tryRemoveRecursive(const std::string & path)
|
||||
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit)
|
||||
{
|
||||
tryRemoveChildrenRecursive(path);
|
||||
tryRemove(path);
|
||||
}
|
||||
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
|
||||
{
|
||||
tryRemoveChildrenRecursive(path);
|
||||
return tryRemove(path);
|
||||
}
|
||||
|
||||
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
|
||||
auto future = promise->get_future();
|
||||
|
||||
auto callback = [promise](const Coordination::RemoveRecursiveResponse & response) mutable
|
||||
{
|
||||
promise->set_value(response);
|
||||
};
|
||||
|
||||
impl->removeRecursive(path, remove_nodes_limit, std::move(callback));
|
||||
|
||||
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto response = future.get();
|
||||
return response.error;
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -1619,6 +1648,14 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version
|
||||
return request;
|
||||
}
|
||||
|
||||
Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit)
|
||||
{
|
||||
auto request = std::make_shared<Coordination::RemoveRecursiveRequest>();
|
||||
request->path = path;
|
||||
request->remove_nodes_limit = remove_nodes_limit;
|
||||
return request;
|
||||
}
|
||||
|
||||
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version)
|
||||
{
|
||||
auto request = std::make_shared<Coordination::SetRequest>();
|
||||
|
@ -479,15 +479,16 @@ public:
|
||||
|
||||
Int64 getClientID();
|
||||
|
||||
/// Remove the node with the subtree. If someone concurrently adds or removes a node
|
||||
/// in the subtree, the result is undefined.
|
||||
void removeRecursive(const std::string & path);
|
||||
/// Remove the node with the subtree.
|
||||
/// If Keeper supports RemoveRecursive operation then it will be performed atomically.
|
||||
/// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined.
|
||||
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
|
||||
|
||||
/// Remove the node with the subtree. If someone concurrently removes a node in the subtree,
|
||||
/// this will not cause errors.
|
||||
/// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and
|
||||
/// if someone concurrently removes a node in the subtree, this will not cause errors.
|
||||
/// For instance, you can call this method twice concurrently for the same node and the end
|
||||
/// result would be the same as for the single call.
|
||||
void tryRemoveRecursive(const std::string & path);
|
||||
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
|
||||
|
||||
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
|
||||
/// Node defined as RemoveException will not be deleted.
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include "Common/ZooKeeper/IKeeper.h"
|
||||
#include "Common/ZooKeeper/ZooKeeperConstants.h"
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
@ -232,6 +232,27 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in)
|
||||
Coordination::read(version, in);
|
||||
}
|
||||
|
||||
void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const
|
||||
{
|
||||
Coordination::write(path, out);
|
||||
Coordination::write(remove_nodes_limit, out);
|
||||
}
|
||||
|
||||
void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in)
|
||||
{
|
||||
Coordination::read(path, in);
|
||||
Coordination::read(remove_nodes_limit, in);
|
||||
}
|
||||
|
||||
std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool /*short_format*/) const
|
||||
{
|
||||
return fmt::format(
|
||||
"path = {}\n"
|
||||
"remove_nodes_limit = {}",
|
||||
path,
|
||||
remove_nodes_limit);
|
||||
}
|
||||
|
||||
void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const
|
||||
{
|
||||
Coordination::write(path, out);
|
||||
@ -510,6 +531,11 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span<const Coordination::Reque
|
||||
checkOperationType(Write);
|
||||
requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
|
||||
}
|
||||
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const RemoveRecursiveRequest *>(generic_request.get()))
|
||||
{
|
||||
checkOperationType(Write);
|
||||
requests.push_back(std::make_shared<ZooKeeperRemoveRecursiveRequest>(*concrete_request_remove_recursive));
|
||||
}
|
||||
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
|
||||
{
|
||||
checkOperationType(Write);
|
||||
@ -707,6 +733,7 @@ ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return se
|
||||
ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSyncResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperAuthResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperRemoveRecursiveResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
|
||||
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
|
||||
@ -1024,6 +1051,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
|
||||
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::CheckNotExists, ZooKeeperCheckRequest>(*this);
|
||||
registerZooKeeperRequest<OpNum::RemoveRecursive, ZooKeeperRemoveRecursiveRequest>(*this);
|
||||
}
|
||||
|
||||
PathMatchResult matchPath(std::string_view path, std::string_view match_to)
|
||||
|
@ -285,6 +285,31 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
|
||||
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
};
|
||||
|
||||
struct ZooKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, ZooKeeperRequest
|
||||
{
|
||||
ZooKeeperRemoveRecursiveRequest() = default;
|
||||
explicit ZooKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {}
|
||||
|
||||
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
|
||||
void writeImpl(WriteBuffer & out) const override;
|
||||
void readImpl(ReadBuffer & in) override;
|
||||
std::string toStringImpl(bool short_format) const override;
|
||||
|
||||
ZooKeeperResponsePtr makeResponse() const override;
|
||||
bool isReadRequest() const override { return false; }
|
||||
|
||||
size_t bytesSize() const override { return RemoveRecursiveRequest::bytesSize() + sizeof(xid); }
|
||||
};
|
||||
|
||||
struct ZooKeeperRemoveRecursiveResponse : RemoveRecursiveResponse, ZooKeeperResponse
|
||||
{
|
||||
void readImpl(ReadBuffer &) override {}
|
||||
void writeImpl(WriteBuffer &) const override {}
|
||||
OpNum getOpNum() const override { return OpNum::RemoveRecursive; }
|
||||
|
||||
size_t bytesSize() const override { return RemoveRecursiveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
};
|
||||
|
||||
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
|
||||
{
|
||||
ZooKeeperExistsRequest() = default;
|
||||
|
@ -29,6 +29,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
|
||||
static_cast<int32_t>(OpNum::GetACL),
|
||||
static_cast<int32_t>(OpNum::FilteredList),
|
||||
static_cast<int32_t>(OpNum::CheckNotExists),
|
||||
static_cast<int32_t>(OpNum::RemoveRecursive),
|
||||
};
|
||||
|
||||
OpNum getOpNum(int32_t raw_op_num)
|
||||
|
@ -40,6 +40,7 @@ enum class OpNum : int32_t
|
||||
FilteredList = 500,
|
||||
CheckNotExists = 501,
|
||||
CreateIfNotExists = 502,
|
||||
RemoveRecursive = 503,
|
||||
|
||||
SessionID = 997, /// Special internal request
|
||||
};
|
||||
|
@ -1347,6 +1347,25 @@ void ZooKeeper::remove(
|
||||
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
|
||||
}
|
||||
|
||||
void ZooKeeper::removeRecursive(
|
||||
const String &path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback)
|
||||
{
|
||||
if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE))
|
||||
throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server");
|
||||
|
||||
ZooKeeperRemoveRecursiveRequest request;
|
||||
request.path = path;
|
||||
request.remove_nodes_limit = remove_nodes_limit;
|
||||
|
||||
RequestInfo request_info;
|
||||
request_info.request = std::make_shared<ZooKeeperRemoveRecursiveRequest>(std::move(request));
|
||||
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveRecursiveResponse &>(response)); };
|
||||
|
||||
pushRequest(std::move(request_info));
|
||||
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
|
||||
}
|
||||
|
||||
void ZooKeeper::exists(
|
||||
const String & path,
|
||||
|
@ -146,6 +146,11 @@ public:
|
||||
int32_t version,
|
||||
RemoveCallback callback) override;
|
||||
|
||||
void removeRecursive(
|
||||
const String &path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback) override;
|
||||
|
||||
void exists(
|
||||
const String & path,
|
||||
ExistsCallback callback,
|
||||
|
@ -11,6 +11,7 @@ enum class KeeperApiVersion : uint8_t
|
||||
WITH_FILTERED_LIST,
|
||||
WITH_MULTI_READ,
|
||||
WITH_CHECK_NOT_EXISTS,
|
||||
WITH_REMOVE_RECURSIVE,
|
||||
};
|
||||
|
||||
const String keeper_system_path = "/keeper";
|
||||
|
@ -91,6 +91,12 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request
|
||||
memory_delta -= remove_req.bytesSize();
|
||||
break;
|
||||
}
|
||||
case Coordination::OpNum::RemoveRecursive:
|
||||
{
|
||||
Coordination::ZooKeeperRemoveRecursiveRequest & remove_req = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveRequest &>(*sub_zk_request);
|
||||
memory_delta -= remove_req.bytesSize();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ enum class KeeperFeatureFlag : size_t
|
||||
MULTI_READ,
|
||||
CHECK_NOT_EXISTS,
|
||||
CREATE_IF_NOT_EXISTS,
|
||||
REMOVE_RECURSIVE,
|
||||
};
|
||||
|
||||
class KeeperFeatureFlags
|
||||
|
@ -832,6 +832,15 @@ std::shared_ptr<typename Container::Node> KeeperStorage<Container>::UncommittedS
|
||||
return tryGetNodeFromStorage(path);
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
const typename Container::Node * KeeperStorage<Container>::UncommittedState::getActualNodeView(StringRef path, const Node & storage_node) const
|
||||
{
|
||||
if (auto node_it = nodes.find(path.toView()); node_it != nodes.end())
|
||||
return node_it->second.node.get();
|
||||
|
||||
return &storage_node;
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
Coordination::ACLs KeeperStorage<Container>::UncommittedState::getACLs(StringRef path) const
|
||||
{
|
||||
@ -1124,7 +1133,7 @@ struct KeeperStorageRequestProcessor
|
||||
}
|
||||
|
||||
virtual KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const
|
||||
processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const
|
||||
{
|
||||
return {};
|
||||
}
|
||||
@ -1241,7 +1250,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
|
||||
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
|
||||
processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
|
||||
{
|
||||
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
|
||||
}
|
||||
@ -1462,16 +1471,41 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
|
||||
}
|
||||
};
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
template <typename Storage>
|
||||
void addUpdateParentPzxidDelta(Storage & storage, std::vector<typename Storage::Delta> & deltas, int64_t zxid, StringRef path)
|
||||
{
|
||||
auto parent_path = parentNodePath(path);
|
||||
if (!storage.uncommitted_state.getNode(parent_path))
|
||||
return;
|
||||
|
||||
deltas.emplace_back(
|
||||
std::string{parent_path},
|
||||
zxid,
|
||||
typename Storage::UpdateNodeDelta
|
||||
{
|
||||
[zxid](Storage::Node & parent)
|
||||
{
|
||||
parent.pzxid = std::max(parent.pzxid, zxid);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
template<typename Storage>
|
||||
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor<Storage>
|
||||
{
|
||||
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
|
||||
|
||||
bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override
|
||||
{
|
||||
return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local);
|
||||
}
|
||||
|
||||
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
|
||||
|
||||
std::vector<typename Storage::Delta>
|
||||
preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
|
||||
{
|
||||
@ -1488,31 +1522,12 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
|
||||
}
|
||||
|
||||
const auto update_parent_pzxid = [&]()
|
||||
{
|
||||
auto parent_path = parentNodePath(request.path);
|
||||
if (!storage.uncommitted_state.getNode(parent_path))
|
||||
return;
|
||||
|
||||
new_deltas.emplace_back(
|
||||
std::string{parent_path},
|
||||
zxid,
|
||||
typename Storage::UpdateNodeDelta
|
||||
{
|
||||
[zxid](Storage::Node & parent)
|
||||
{
|
||||
parent.pzxid = std::max(parent.pzxid, zxid);
|
||||
}
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
auto node = storage.uncommitted_state.getNode(request.path);
|
||||
|
||||
if (!node)
|
||||
{
|
||||
if (request.restored_from_zookeeper_log)
|
||||
update_parent_pzxid();
|
||||
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
|
||||
}
|
||||
else if (request.version != -1 && request.version != node->version)
|
||||
@ -1521,7 +1536,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
|
||||
|
||||
if (request.restored_from_zookeeper_log)
|
||||
update_parent_pzxid();
|
||||
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
|
||||
|
||||
new_deltas.emplace_back(
|
||||
std::string{parentNodePath(request.path)},
|
||||
@ -1552,12 +1567,318 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
|
||||
}
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
|
||||
processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
|
||||
{
|
||||
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
|
||||
}
|
||||
};
|
||||
|
||||
template<typename Storage>
|
||||
struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorageRequestProcessor<Storage>
|
||||
{
|
||||
using KeeperStorageRequestProcessor<Storage>::KeeperStorageRequestProcessor;
|
||||
|
||||
bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override
|
||||
{
|
||||
return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local);
|
||||
}
|
||||
|
||||
std::vector<typename Storage::Delta>
|
||||
preprocess(Storage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest);
|
||||
Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveRequest &>(*this->zk_request);
|
||||
|
||||
std::vector<typename Storage::Delta> new_deltas;
|
||||
|
||||
if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH)
|
||||
{
|
||||
auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
|
||||
|
||||
handleSystemNodeModification(keeper_context, error_msg);
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
|
||||
}
|
||||
|
||||
auto node = storage.uncommitted_state.getNode(request.path);
|
||||
|
||||
if (!node)
|
||||
{
|
||||
if (request.restored_from_zookeeper_log)
|
||||
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
|
||||
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
|
||||
}
|
||||
|
||||
ToDeleteTreeCollector collector(storage, zxid, session_id, request.remove_nodes_limit);
|
||||
auto collect_status = collector.collect(request.path, *node);
|
||||
|
||||
if (collect_status == ToDeleteTreeCollector::CollectStatus::NoAuth)
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZNOAUTH}};
|
||||
|
||||
if (collect_status == ToDeleteTreeCollector::CollectStatus::LimitExceeded)
|
||||
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
|
||||
|
||||
if (request.restored_from_zookeeper_log)
|
||||
addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path);
|
||||
|
||||
auto delete_deltas = collector.extractDeltas();
|
||||
|
||||
for (const auto & delta : delete_deltas)
|
||||
{
|
||||
const auto * remove_delta = std::get_if<typename Storage::RemoveNodeDelta>(&delta.operation);
|
||||
if (remove_delta && remove_delta->ephemeral_owner)
|
||||
storage.unregisterEphemeralPath(remove_delta->ephemeral_owner, delta.path);
|
||||
}
|
||||
|
||||
new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end()));
|
||||
|
||||
digest = storage.calculateNodesDigest(digest, new_deltas);
|
||||
|
||||
return new_deltas;
|
||||
}
|
||||
|
||||
Coordination::ZooKeeperResponsePtr process(Storage & storage, int64_t zxid) const override
|
||||
{
|
||||
Coordination::ZooKeeperResponsePtr response_ptr = this->zk_request->makeResponse();
|
||||
Coordination::ZooKeeperRemoveRecursiveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveResponse &>(*response_ptr);
|
||||
|
||||
response.error = storage.commit(zxid);
|
||||
return response_ptr;
|
||||
}
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(const Storage & storage, int64_t zxid, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override
|
||||
{
|
||||
/// need to iterate over zxid deltas and update watches for deleted tree.
|
||||
const auto & deltas = storage.uncommitted_state.deltas;
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions responses;
|
||||
for (auto it = deltas.rbegin(); it != deltas.rend() && it->zxid == zxid; ++it)
|
||||
{
|
||||
const auto * remove_delta = std::get_if<typename Storage::RemoveNodeDelta>(&it->operation);
|
||||
if (remove_delta)
|
||||
{
|
||||
auto new_responses = processWatchesImpl(it->path, watches, list_watches, Coordination::Event::DELETED);
|
||||
responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end()));
|
||||
}
|
||||
}
|
||||
|
||||
return responses;
|
||||
}
|
||||
|
||||
private:
|
||||
using SNode = typename Storage::Node;
|
||||
|
||||
class ToDeleteTreeCollector
|
||||
{
|
||||
Storage & storage;
|
||||
int64_t zxid;
|
||||
int64_t session_id;
|
||||
uint32_t limit;
|
||||
|
||||
uint32_t max_level = 0;
|
||||
uint32_t nodes_observed = 1; /// root node
|
||||
std::unordered_map<uint32_t, std::vector<typename Storage::Delta>> by_level_deltas;
|
||||
|
||||
struct Step
|
||||
{
|
||||
String path;
|
||||
std::variant<SNode, const SNode *> node;
|
||||
uint32_t level;
|
||||
};
|
||||
|
||||
enum class CollectStatus
|
||||
{
|
||||
Ok,
|
||||
NoAuth,
|
||||
LimitExceeded,
|
||||
};
|
||||
|
||||
friend struct KeeperStorageRemoveRecursiveRequestProcessor;
|
||||
|
||||
public:
|
||||
ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, int64_t session_id_, uint32_t limit_)
|
||||
: storage(storage_)
|
||||
, zxid(zxid_)
|
||||
, session_id(session_id_)
|
||||
, limit(limit_)
|
||||
{
|
||||
}
|
||||
|
||||
CollectStatus collect(StringRef root_path, const SNode & root_node)
|
||||
{
|
||||
std::deque<Step> steps;
|
||||
|
||||
if (checkLimits(&root_node))
|
||||
return CollectStatus::LimitExceeded;
|
||||
|
||||
steps.push_back(Step{root_path.toString(), &root_node, 0});
|
||||
|
||||
while (!steps.empty())
|
||||
{
|
||||
Step step = std::move(steps.front());
|
||||
steps.pop_front();
|
||||
|
||||
StringRef path = step.path;
|
||||
uint32_t level = step.level;
|
||||
const SNode * node_ptr = nullptr;
|
||||
|
||||
if (auto * rdb = std::get_if<SNode>(&step.node))
|
||||
node_ptr = rdb;
|
||||
else
|
||||
node_ptr = std::get<const SNode *>(step.node);
|
||||
|
||||
chassert(!path.empty());
|
||||
chassert(node_ptr != nullptr);
|
||||
|
||||
const auto & node = *node_ptr;
|
||||
auto actual_node_ptr = storage.uncommitted_state.getActualNodeView(path, node);
|
||||
chassert(actual_node_ptr != nullptr); /// explicitly check that node is not deleted
|
||||
|
||||
if (actual_node_ptr->numChildren() > 0 && !storage.checkACL(path, Coordination::ACL::Delete, session_id, /*is_local=*/false))
|
||||
return CollectStatus::NoAuth;
|
||||
|
||||
if (auto status = visitRocksDBNode(steps, path, level); status != CollectStatus::Ok)
|
||||
return status;
|
||||
|
||||
if (auto status = visitMemNode(steps, path, level); status != CollectStatus::Ok)
|
||||
return status;
|
||||
|
||||
if (auto status = visitRootAndUncommitted(steps, path, node, level); status != CollectStatus::Ok)
|
||||
return status;
|
||||
}
|
||||
|
||||
return CollectStatus::Ok;
|
||||
}
|
||||
|
||||
std::vector<typename Storage::Delta> extractDeltas()
|
||||
{
|
||||
std::vector<typename Storage::Delta> deltas;
|
||||
|
||||
for (ssize_t level = max_level; level >= 0; --level)
|
||||
{
|
||||
auto & level_deltas = by_level_deltas[static_cast<uint32_t>(level)];
|
||||
deltas.insert(deltas.end(), std::make_move_iterator(level_deltas.begin()), std::make_move_iterator(level_deltas.end()));
|
||||
}
|
||||
|
||||
return std::move(deltas);
|
||||
}
|
||||
|
||||
private:
|
||||
CollectStatus visitRocksDBNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
|
||||
{
|
||||
if constexpr (Storage::use_rocksdb)
|
||||
{
|
||||
std::filesystem::path root_fs_path(root_path.toString());
|
||||
auto children = storage.container.getChildren(root_path.toString());
|
||||
|
||||
for (auto && [child_name, child_node] : children)
|
||||
{
|
||||
auto child_path = (root_fs_path / child_name).generic_string();
|
||||
const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node);
|
||||
|
||||
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
|
||||
continue;
|
||||
|
||||
if (checkLimits(actual_child_node_ptr))
|
||||
return CollectStatus::LimitExceeded;
|
||||
|
||||
steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1});
|
||||
}
|
||||
}
|
||||
|
||||
return CollectStatus::Ok;
|
||||
}
|
||||
|
||||
CollectStatus visitMemNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
|
||||
{
|
||||
if constexpr (!Storage::use_rocksdb)
|
||||
{
|
||||
auto node_it = storage.container.find(root_path);
|
||||
if (node_it == storage.container.end())
|
||||
return CollectStatus::Ok;
|
||||
|
||||
std::filesystem::path root_fs_path(root_path.toString());
|
||||
const auto & children = node_it->value.getChildren();
|
||||
|
||||
for (const auto & child_name : children)
|
||||
{
|
||||
auto child_path = (root_fs_path / child_name.toView()).generic_string();
|
||||
|
||||
auto child_it = storage.container.find(child_path);
|
||||
chassert(child_it != storage.container.end());
|
||||
const auto & child_node = child_it->value;
|
||||
|
||||
const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node);
|
||||
|
||||
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
|
||||
continue;
|
||||
|
||||
if (checkLimits(actual_child_node_ptr))
|
||||
return CollectStatus::LimitExceeded;
|
||||
|
||||
steps.push_back(Step{std::move(child_path), &child_node, level + 1});
|
||||
}
|
||||
}
|
||||
|
||||
return CollectStatus::Ok;
|
||||
}
|
||||
|
||||
CollectStatus visitRootAndUncommitted(std::deque<Step> & steps, StringRef root_path, const SNode & root_node, uint32_t level)
|
||||
{
|
||||
const auto & nodes = storage.uncommitted_state.nodes;
|
||||
|
||||
/// nodes are sorted by paths with level locality
|
||||
auto it = nodes.upper_bound(root_path.toString() + "/");
|
||||
|
||||
for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it)
|
||||
{
|
||||
const auto actual_child_node_ptr = it->second.node.get();
|
||||
|
||||
if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction
|
||||
continue;
|
||||
|
||||
if (checkLimits(actual_child_node_ptr))
|
||||
return CollectStatus::LimitExceeded;
|
||||
|
||||
const String & child_path = it->first;
|
||||
const SNode & child_node = *it->second.node;
|
||||
|
||||
steps.push_back(Step{child_path, &child_node, level + 1});
|
||||
}
|
||||
|
||||
addDelta(root_path, root_node, level);
|
||||
|
||||
return CollectStatus::Ok;
|
||||
}
|
||||
|
||||
void addDelta(StringRef root_path, const SNode & root_node, uint32_t level)
|
||||
{
|
||||
max_level = std::max(max_level, level);
|
||||
|
||||
by_level_deltas[level].emplace_back(
|
||||
parentNodePath(root_path).toString(),
|
||||
zxid,
|
||||
typename Storage::UpdateNodeDelta{
|
||||
[](SNode & parent)
|
||||
{
|
||||
++parent.cversion;
|
||||
parent.decreaseNumChildren();
|
||||
}
|
||||
});
|
||||
|
||||
by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()});
|
||||
}
|
||||
|
||||
bool checkLimits(const SNode * node)
|
||||
{
|
||||
chassert(node != nullptr);
|
||||
nodes_observed += node->numChildren();
|
||||
return nodes_observed > limit;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
template<typename Storage>
|
||||
struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor<Storage>
|
||||
{
|
||||
@ -1709,7 +2030,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
|
||||
}
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
|
||||
processWatches(const Storage & /*storage*/, int64_t /*zxid*/, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
|
||||
{
|
||||
return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
|
||||
}
|
||||
@ -2131,6 +2452,10 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
|
||||
check_operation_type(OperationType::Write);
|
||||
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor<Storage>>(sub_zk_request));
|
||||
break;
|
||||
case Coordination::OpNum::RemoveRecursive:
|
||||
check_operation_type(OperationType::Write);
|
||||
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRecursiveRequestProcessor<Storage>>(sub_zk_request));
|
||||
break;
|
||||
case Coordination::OpNum::Set:
|
||||
check_operation_type(OperationType::Write);
|
||||
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor<Storage>>(sub_zk_request));
|
||||
@ -2250,12 +2575,12 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
|
||||
}
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions
|
||||
processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
|
||||
processWatches(const Storage & storage, int64_t zxid, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override
|
||||
{
|
||||
typename Storage::ResponsesForSessions result;
|
||||
for (const auto & generic_request : concrete_requests)
|
||||
{
|
||||
auto responses = generic_request->processWatches(watches, list_watches);
|
||||
auto responses = generic_request->processWatches(storage, zxid, watches, list_watches);
|
||||
result.insert(result.end(), responses.begin(), responses.end());
|
||||
}
|
||||
return result;
|
||||
@ -2400,6 +2725,7 @@ KeeperStorageRequestProcessorsFactory<Storage>::KeeperStorageRequestProcessorsFa
|
||||
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor<Storage>>(*this);
|
||||
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor<Storage>>(*this);
|
||||
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor<Storage>>(*this);
|
||||
registerKeeperRequestProcessor<Coordination::OpNum::RemoveRecursive, KeeperStorageRemoveRecursiveRequestProcessor<Storage>>(*this);
|
||||
}
|
||||
|
||||
|
||||
@ -2718,7 +3044,7 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
|
||||
/// If this requests processed successfully we need to check watches
|
||||
if (response->error == Coordination::Error::ZOK)
|
||||
{
|
||||
auto watch_responses = request_processor->processWatches(watches, list_watches);
|
||||
auto watch_responses = request_processor->processWatches(*this, zxid, watches, list_watches);
|
||||
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
|
||||
}
|
||||
|
||||
|
@ -566,6 +566,7 @@ public:
|
||||
void rollback(int64_t rollback_zxid);
|
||||
|
||||
std::shared_ptr<Node> getNode(StringRef path) const;
|
||||
const Node * getActualNodeView(StringRef path, const Node & storage_node) const;
|
||||
Coordination::ACLs getACLs(StringRef path) const;
|
||||
|
||||
void applyDelta(const Delta & delta);
|
||||
@ -609,7 +610,18 @@ public:
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
|
||||
struct PathCmp
|
||||
{
|
||||
using is_transparent = std::true_type;
|
||||
|
||||
auto operator()(const std::string_view a,
|
||||
const std::string_view b) const
|
||||
{
|
||||
return a.size() < b.size() || (a.size() == b.size() && a < b);
|
||||
}
|
||||
};
|
||||
|
||||
mutable std::map<std::string, UncommittedNode, PathCmp> nodes;
|
||||
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
|
||||
|
||||
std::list<Delta> deltas;
|
||||
|
@ -3113,6 +3113,8 @@ TYPED_TEST(CoordinationTest, TestFeatureFlags)
|
||||
ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST));
|
||||
ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ));
|
||||
ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS));
|
||||
ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS));
|
||||
ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE));
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestSystemNodeModify)
|
||||
@ -3374,6 +3376,474 @@ TYPED_TEST(CoordinationTest, TestReapplyingDeltas)
|
||||
ASSERT_TRUE(children1_set == children2_set);
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
|
||||
Storage storage{500, "", this->keeper_context};
|
||||
|
||||
int32_t zxid = 0;
|
||||
|
||||
const auto create = [&](const String & path, int create_mode)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_request->path = path;
|
||||
create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
|
||||
storage.preprocessRequest(create_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(create_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
|
||||
};
|
||||
|
||||
const auto remove = [&](const String & path, int32_t version = -1)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
auto remove_request = std::make_shared<ZooKeeperRemoveRequest>();
|
||||
remove_request->path = path;
|
||||
remove_request->version = version;
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, new_zxid);
|
||||
return storage.processRequest(remove_request, 1, new_zxid);
|
||||
};
|
||||
|
||||
const auto remove_recursive = [&](const String & path, uint32_t remove_nodes_limit = 1)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
auto remove_request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
|
||||
remove_request->path = path;
|
||||
remove_request->remove_nodes_limit = remove_nodes_limit;
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, new_zxid);
|
||||
return storage.processRequest(remove_request, 1, new_zxid);
|
||||
};
|
||||
|
||||
const auto exists = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto exists_request = std::make_shared<ZooKeeperExistsRequest>();
|
||||
exists_request->path = path;
|
||||
|
||||
storage.preprocessRequest(exists_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(exists_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
return responses[0].response->error == Coordination::Error::ZOK;
|
||||
};
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Single Remove Single Node");
|
||||
create("/T1", zkutil::CreateMode::Persistent);
|
||||
|
||||
auto responses = remove("/T1");
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
ASSERT_FALSE(exists("/T1"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Single Remove Tree");
|
||||
create("/T2", zkutil::CreateMode::Persistent);
|
||||
create("/T2/A", zkutil::CreateMode::Persistent);
|
||||
|
||||
auto responses = remove("/T2");
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY);
|
||||
ASSERT_TRUE(exists("/T2"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove Single Node");
|
||||
create("/T3", zkutil::CreateMode::Persistent);
|
||||
|
||||
auto responses = remove_recursive("/T3", 100);
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
ASSERT_FALSE(exists("/T3"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove Tree Small Limit");
|
||||
create("/T5", zkutil::CreateMode::Persistent);
|
||||
create("/T5/A", zkutil::CreateMode::Persistent);
|
||||
create("/T5/B", zkutil::CreateMode::Persistent);
|
||||
create("/T5/A/C", zkutil::CreateMode::Persistent);
|
||||
|
||||
auto responses = remove_recursive("/T5", 2);
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY);
|
||||
ASSERT_TRUE(exists("/T5"));
|
||||
ASSERT_TRUE(exists("/T5/A"));
|
||||
ASSERT_TRUE(exists("/T5/B"));
|
||||
ASSERT_TRUE(exists("/T5/A/C"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove Tree Big Limit");
|
||||
create("/T6", zkutil::CreateMode::Persistent);
|
||||
create("/T6/A", zkutil::CreateMode::Persistent);
|
||||
create("/T6/B", zkutil::CreateMode::Persistent);
|
||||
create("/T6/A/C", zkutil::CreateMode::Persistent);
|
||||
|
||||
auto responses = remove_recursive("/T6", 4);
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
ASSERT_FALSE(exists("/T6"));
|
||||
ASSERT_FALSE(exists("/T6/A"));
|
||||
ASSERT_FALSE(exists("/T6/B"));
|
||||
ASSERT_FALSE(exists("/T6/A/C"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove Ephemeral");
|
||||
create("/T7", zkutil::CreateMode::Ephemeral);
|
||||
ASSERT_EQ(storage.ephemerals.size(), 1);
|
||||
|
||||
auto responses = remove_recursive("/T7", 100);
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
ASSERT_EQ(storage.ephemerals.size(), 0);
|
||||
ASSERT_FALSE(exists("/T7"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove Tree With Ephemeral");
|
||||
create("/T8", zkutil::CreateMode::Persistent);
|
||||
create("/T8/A", zkutil::CreateMode::Persistent);
|
||||
create("/T8/B", zkutil::CreateMode::Ephemeral);
|
||||
create("/T8/A/C", zkutil::CreateMode::Ephemeral);
|
||||
ASSERT_EQ(storage.ephemerals.size(), 1);
|
||||
|
||||
auto responses = remove_recursive("/T8", 4);
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
ASSERT_EQ(storage.ephemerals.size(), 0);
|
||||
ASSERT_FALSE(exists("/T8"));
|
||||
ASSERT_FALSE(exists("/T8/A"));
|
||||
ASSERT_FALSE(exists("/T8/B"));
|
||||
ASSERT_FALSE(exists("/T8/A/C"));
|
||||
}
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
|
||||
Storage storage{500, "", this->keeper_context};
|
||||
int zxid = 0;
|
||||
|
||||
auto prepare_create_tree = []()
|
||||
{
|
||||
return Coordination::Requests{
|
||||
zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral),
|
||||
zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral),
|
||||
};
|
||||
};
|
||||
|
||||
const auto exists = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto exists_request = std::make_shared<ZooKeeperExistsRequest>();
|
||||
exists_request->path = path;
|
||||
|
||||
storage.preprocessRequest(exists_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(exists_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
return responses[0].response->error == Coordination::Error::ZOK;
|
||||
};
|
||||
|
||||
const auto is_multi_ok = [&](Coordination::ZooKeeperResponsePtr response)
|
||||
{
|
||||
const auto & multi_response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response);
|
||||
|
||||
for (const auto & op_response : multi_response.responses)
|
||||
if (op_response->error != Coordination::Error::ZOK)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Remove In Multi Tx");
|
||||
int new_zxid = ++zxid;
|
||||
auto ops = prepare_create_tree();
|
||||
|
||||
ops.push_back(zkutil::makeRemoveRequest("/A", -1));
|
||||
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
|
||||
storage.preprocessRequest(request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(request, 1, new_zxid);
|
||||
ops.pop_back();
|
||||
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_FALSE(is_multi_ok(responses[0].response));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove In Multi Tx");
|
||||
int new_zxid = ++zxid;
|
||||
auto ops = prepare_create_tree();
|
||||
|
||||
ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4));
|
||||
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
|
||||
storage.preprocessRequest(request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(request, 1, new_zxid);
|
||||
ops.pop_back();
|
||||
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_TRUE(is_multi_ok(responses[0].response));
|
||||
ASSERT_FALSE(exists("/A"));
|
||||
ASSERT_FALSE(exists("/A/C"));
|
||||
ASSERT_FALSE(exists("/A/B"));
|
||||
ASSERT_FALSE(exists("/A/B/D"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove With Regular In Multi Tx");
|
||||
int new_zxid = ++zxid;
|
||||
auto ops = prepare_create_tree();
|
||||
|
||||
ops.push_back(zkutil::makeRemoveRequest("/A/C", -1));
|
||||
ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 3));
|
||||
const auto request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
|
||||
storage.preprocessRequest(request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(request, 1, new_zxid);
|
||||
ops.pop_back();
|
||||
ops.pop_back();
|
||||
|
||||
ASSERT_EQ(responses.size(), 1);
|
||||
ASSERT_TRUE(is_multi_ok(responses[0].response));
|
||||
ASSERT_FALSE(exists("/A"));
|
||||
ASSERT_FALSE(exists("/A/C"));
|
||||
ASSERT_FALSE(exists("/A/B"));
|
||||
ASSERT_FALSE(exists("/A/B/D"));
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("Recursive Remove From Committed and Uncommitted states");
|
||||
int create_zxid = ++zxid;
|
||||
auto ops = prepare_create_tree();
|
||||
|
||||
/// First create nodes
|
||||
const auto create_request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
storage.preprocessRequest(create_request, 1, 0, create_zxid);
|
||||
auto create_responses = storage.processRequest(create_request, 1, create_zxid);
|
||||
ASSERT_EQ(create_responses.size(), 1);
|
||||
ASSERT_TRUE(is_multi_ok(create_responses[0].response));
|
||||
ASSERT_TRUE(exists("/A"));
|
||||
ASSERT_TRUE(exists("/A/C"));
|
||||
ASSERT_TRUE(exists("/A/B"));
|
||||
ASSERT_TRUE(exists("/A/B/D"));
|
||||
|
||||
/// Remove node A/C as a single remove request.
|
||||
/// Remove all other as remove recursive request.
|
||||
/// In this case we should list storage to understand the tree topology
|
||||
/// but ignore already deleted nodes in uncommitted state.
|
||||
|
||||
int remove_zxid = ++zxid;
|
||||
ops = {
|
||||
zkutil::makeRemoveRequest("/A/C", -1),
|
||||
zkutil::makeRemoveRecursiveRequest("/A", 3),
|
||||
};
|
||||
const auto remove_request = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, remove_zxid);
|
||||
auto remove_responses = storage.processRequest(remove_request, 1, remove_zxid);
|
||||
|
||||
ASSERT_EQ(remove_responses.size(), 1);
|
||||
ASSERT_TRUE(is_multi_ok(remove_responses[0].response));
|
||||
ASSERT_FALSE(exists("/A"));
|
||||
ASSERT_FALSE(exists("/A/C"));
|
||||
ASSERT_FALSE(exists("/A/B"));
|
||||
ASSERT_FALSE(exists("/A/B/D"));
|
||||
}
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
|
||||
Storage storage{500, "", this->keeper_context};
|
||||
int zxid = 0;
|
||||
|
||||
const auto create = [&](const String & path, int create_mode)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_request->path = path;
|
||||
create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
|
||||
storage.preprocessRequest(create_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(create_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
|
||||
};
|
||||
|
||||
const auto add_watch = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto exists_request = std::make_shared<ZooKeeperExistsRequest>();
|
||||
exists_request->path = path;
|
||||
exists_request->has_watch = true;
|
||||
|
||||
storage.preprocessRequest(exists_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(exists_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
};
|
||||
|
||||
const auto add_list_watch = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto list_request = std::make_shared<ZooKeeperListRequest>();
|
||||
list_request->path = path;
|
||||
list_request->has_watch = true;
|
||||
|
||||
storage.preprocessRequest(list_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(list_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
};
|
||||
|
||||
create("/A", zkutil::CreateMode::Persistent);
|
||||
create("/A/B", zkutil::CreateMode::Persistent);
|
||||
create("/A/C", zkutil::CreateMode::Ephemeral);
|
||||
create("/A/B/D", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
add_watch("/A");
|
||||
add_watch("/A/B");
|
||||
add_watch("/A/C");
|
||||
add_watch("/A/B/D");
|
||||
add_list_watch("/A");
|
||||
add_list_watch("/A/B");
|
||||
ASSERT_EQ(storage.watches.size(), 4);
|
||||
ASSERT_EQ(storage.list_watches.size(), 2);
|
||||
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
auto remove_request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
|
||||
remove_request->path = "/A";
|
||||
remove_request->remove_nodes_limit = 4;
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(remove_request, 1, new_zxid);
|
||||
|
||||
ASSERT_EQ(responses.size(), 7);
|
||||
|
||||
for (size_t i = 0; i < 7; ++i)
|
||||
{
|
||||
ASSERT_EQ(responses[i].response->error, Coordination::Error::ZOK);
|
||||
|
||||
if (const auto * watch_response = dynamic_cast<Coordination::ZooKeeperWatchResponse *>(responses[i].response.get()))
|
||||
ASSERT_EQ(watch_response->type, Coordination::Event::DELETED);
|
||||
}
|
||||
|
||||
ASSERT_EQ(storage.watches.size(), 0);
|
||||
ASSERT_EQ(storage.list_watches.size(), 0);
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestRemoveRecursiveAcls)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
|
||||
Storage storage{500, "", this->keeper_context};
|
||||
int zxid = 0;
|
||||
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
String user_auth_data = "test_user:test_password";
|
||||
|
||||
const auto auth_request = std::make_shared<ZooKeeperAuthRequest>();
|
||||
auth_request->scheme = "digest";
|
||||
auth_request->data = user_auth_data;
|
||||
|
||||
storage.preprocessRequest(auth_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(auth_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to add auth to session";
|
||||
}
|
||||
|
||||
const auto create = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_request->path = path;
|
||||
create_request->acls = {{.permissions = ACL::Create, .scheme = "auth", .id = ""}};
|
||||
|
||||
storage.preprocessRequest(create_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(create_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
|
||||
};
|
||||
|
||||
/// Add nodes with only Create ACL
|
||||
create("/A");
|
||||
create("/A/B");
|
||||
create("/A/C");
|
||||
create("/A/B/D");
|
||||
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
auto remove_request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
|
||||
remove_request->path = "/A";
|
||||
remove_request->remove_nodes_limit = 4;
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(remove_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZNOAUTH);
|
||||
}
|
||||
}
|
||||
|
||||
/// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
|
||||
/// CoordinationTest,
|
||||
/// ::testing::ValuesIn(std::initializer_list<CompressionParam>{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}}));
|
||||
|
@ -93,6 +93,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription()
|
||||
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},
|
||||
{"CheckNotExists", static_cast<Int16>(Coordination::OpNum::CheckNotExists)},
|
||||
{"CreateIfNotExists", static_cast<Int16>(Coordination::OpNum::CreateIfNotExists)},
|
||||
{"RemoveRecursive", static_cast<Int16>(Coordination::OpNum::RemoveRecursive)},
|
||||
});
|
||||
|
||||
auto error_enum = getCoordinationErrorCodesEnumType();
|
||||
|
Loading…
Reference in New Issue
Block a user