Merge pull request #38338 from ClickHouse/zookeeper-add-extra-list-argument

Extend ZooKeeper list request with support for filtering persistent or ephemeral nodes only
This commit is contained in:
alesapin 2022-07-05 12:53:51 +02:00 committed by GitHub
commit bd8a3ee841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 243 additions and 28 deletions

View File

@ -281,6 +281,13 @@ struct SetResponse : virtual Response
size_t bytesSize() const override { return sizeof(stat); }
};
enum class ListRequestType : uint8_t
{
ALL,
PERSISTENT_ONLY,
EPHEMERAL_ONLY
};
struct ListRequest : virtual Request
{
String path;
@ -492,6 +499,7 @@ public:
virtual void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) = 0;

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/TestKeeper.h>
#include <Common/setThreadName.h>
#include <Common/StringUtils/StringUtils.h>
@ -119,12 +120,17 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
}
};
struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
struct TestKeeperListRequest : ListRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperFilteredListRequest final : TestKeeperListRequest
{
ListRequestType list_request_type;
};
struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
{
TestKeeperCheckRequest() = default;
@ -390,8 +396,18 @@ std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Containe
child_it != container.end() && startsWith(child_it->first, path_prefix);
++child_it)
{
using enum ListRequestType;
if (parentPath(child_it->first) == path)
response.names.emplace_back(baseName(child_it->first));
{
ListRequestType list_request_type = ALL;
if (const auto * filtered_list = dynamic_cast<const TestKeeperFilteredListRequest *>(this))
list_request_type = filtered_list->list_request_type;
const auto is_ephemeral = child_it->second.stat.ephemeralOwner != 0;
if (list_request_type == ALL || (is_ephemeral && list_request_type == EPHEMERAL_ONLY)
|| (!is_ephemeral && list_request_type == PERSISTENT_ONLY))
response.names.emplace_back(baseName(child_it->first));
}
}
response.stat = it->second.stat;
@ -768,11 +784,13 @@ void TestKeeper::set(
void TestKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
TestKeeperListRequest request;
TestKeeperFilteredListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));

View File

@ -71,6 +71,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -9,6 +9,7 @@
#include <base/find_symbols.h>
#include <base/sort.h>
#include <base/getFQDNOrHostName.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
@ -312,9 +313,10 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback)
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback);
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
@ -335,26 +337,28 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
}
}
Strings ZooKeeper::getChildren(
const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
Strings ZooKeeper::getChildren(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
return res;
}
Strings ZooKeeper::getChildrenWatch(
const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback), path);
return res;
}
Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat, const EventPtr & watch)
Coordination::Error ZooKeeper::tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -362,10 +366,14 @@ Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings
return code;
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Coordination::Error ZooKeeper::tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback);
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -1046,7 +1054,8 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncTrySetNoThrow(const std::
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1059,11 +1068,12 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1073,7 +1083,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(co
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}

View File

@ -194,11 +194,13 @@ public:
/// * The node doesn't exist.
Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Performs several operations in a transaction.
/// Throws on every error.
@ -279,9 +281,15 @@ public:
FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureGetChildren = std::future<Coordination::ListResponse>;
FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncGetChildren(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncTryGetChildrenNoThrow(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureSet = std::future<Coordination::SetResponse>;
FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
@ -335,7 +343,11 @@ private:
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
Coordination::Error getChildrenImpl(
const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -298,6 +299,32 @@ std::string ZooKeeperListRequest::toStringImpl() const
return fmt::format("path = {}", path);
}
void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
Coordination::write(static_cast<uint8_t>(list_request_type), out);
}
void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
Coordination::read(has_watch, in);
uint8_t read_request_type{0};
Coordination::read(read_request_type, in);
list_request_type = static_cast<ListRequestType>(read_request_type);
}
std::string ZooKeeperFilteredListRequest::toStringImpl() const
{
return fmt::format(
"path = {}\n"
"list_request_type = {}",
path,
list_request_type);
}
void ZooKeeperListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);

View File

@ -347,6 +347,18 @@ struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
OpNum getOpNum() const override { return OpNum::SimpleList; }
};
struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
{
ListRequestType list_request_type{ListRequestType::ALL};
OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); }
};
struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;

View File

@ -64,6 +64,8 @@ std::string toString(OpNum op_num)
return "SetACL";
case OpNum::GetACL:
return "GetACL";
case OpNum::FilteredList:
return "FilteredList";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -32,6 +32,10 @@ enum class OpNum : int32_t
Check = 13,
Multi = 14,
Auth = 100,
// CH Keeper specific operations
FilteredList = 500,
SessionID = 997, /// Special internal request
};

View File

@ -28,6 +28,11 @@ void write(int32_t x, WriteBuffer & out)
writeBinary(x, out);
}
void write(uint8_t x, WriteBuffer & out)
{
writeBinary(x, out);
}
void write(OpNum x, WriteBuffer & out)
{
write(static_cast<int32_t>(x), out);
@ -91,6 +96,11 @@ void read(int64_t & x, ReadBuffer & in)
x = __builtin_bswap64(x);
}
void read(uint8_t & x, ReadBuffer & in)
{
readBinary(x, in);
}
void read(int32_t & x, ReadBuffer & in)
{
readBinary(x, in);

View File

@ -22,6 +22,7 @@ void write(uint64_t x, WriteBuffer & out);
void write(int64_t x, WriteBuffer & out);
void write(int32_t x, WriteBuffer & out);
void write(uint8_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out);
void write(bool x, WriteBuffer & out);
void write(const std::string & s, WriteBuffer & out);
@ -50,6 +51,7 @@ void read(uint64_t & x, ReadBuffer & in);
#endif
void read(int64_t & x, ReadBuffer & in);
void read(int32_t & x, ReadBuffer & in);
void read(uint8_t & x, ReadBuffer & in);
void read(OpNum & x, ReadBuffer & in);
void read(bool & x, ReadBuffer & in);
void read(int8_t & x, ReadBuffer & in);

View File

@ -1168,11 +1168,13 @@ void ZooKeeper::set(
void ZooKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
ZooKeeperListRequest request;
ZooKeeperFilteredListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperListRequest>(std::move(request));

View File

@ -164,6 +164,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -6,6 +6,7 @@
#include <boost/algorithm/string.hpp>
#include <Poco/Base64Encoder.h>
#include <Poco/SHA1Engine.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/SipHash.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/StringUtils/StringUtils.h>
@ -19,6 +20,7 @@
#include <mutex>
#include <functional>
#include <base/defines.h>
#include <filesystem>
namespace DB
{
@ -1161,6 +1163,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
}
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
@ -1178,8 +1181,31 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
const auto & children = node_it->value.getChildren();
response.names.reserve(children.size());
const auto add_child = [&](const auto child)
{
using enum Coordination::ListRequestType;
auto list_request_type = ALL;
if (auto * filtered_list = dynamic_cast<Coordination::ZooKeeperFilteredListRequest *>(&request))
list_request_type = filtered_list->list_request_type;
if (list_request_type == ALL)
return true;
auto child_path = (std::filesystem::path(request.path) / child.toView()).generic_string();
auto child_it = container.find(child_path);
if (child_it == container.end())
onStorageInconsistency();
const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0;
return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY);
};
for (const auto child : children)
response.names.push_back(child.toString());
{
if (add_child(child))
response.names.push_back(child.toString());
}
response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK;
@ -1623,7 +1649,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
void KeeperStorage::finalize()
{
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
throw DB::Exception("KeeperStorage already finalized", ErrorCodes::LOGICAL_ERROR);
finalized = true;
@ -1689,6 +1715,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::Set, KeeperStorageSetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::List, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SimpleList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::FilteredList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);

View File

@ -27,6 +27,7 @@
#include <Coordination/Changelog.h>
#include <filesystem>
#include <Common/SipHash.h>
#include <Coordination/pathUtils.h>
#include <Coordination/SnapshotableHashTable.h>
@ -1956,6 +1957,84 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud)
ASSERT_FALSE(get_committed_data());
}
TEST_P(CoordinationTest, TestListRequestTypes)
{
using namespace DB;
using namespace Coordination;
KeeperStorage storage{500, "", true};
int64_t zxid = 0;
static constexpr std::string_view path = "/test";
const auto create_path = [&](bool is_ephemeral)
{
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
int new_zxid = ++zxid;
create_request->path = path;
create_request->is_sequential = true;
create_request->is_ephemeral = is_ephemeral;
storage.preprocessRequest(create_request, 1, 0, new_zxid);
auto responses = storage.processRequest(create_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
const auto & create_response = dynamic_cast<ZooKeeperCreateResponse &>(*responses[0].response);
return create_response.path_created;
};
static constexpr size_t persistent_num = 5;
std::unordered_set<std::string> expected_persistent_children;
for (size_t i = 0; i < persistent_num; ++i)
{
expected_persistent_children.insert(getBaseName(create_path(false)).toString());
}
ASSERT_EQ(expected_persistent_children.size(), persistent_num);
static constexpr size_t ephemeral_num = 5;
std::unordered_set<std::string> expected_ephemeral_children;
for (size_t i = 0; i < ephemeral_num; ++i)
{
expected_ephemeral_children.insert(getBaseName(create_path(true)).toString());
}
ASSERT_EQ(expected_ephemeral_children.size(), ephemeral_num);
const auto get_children = [&](const auto list_request_type)
{
const auto list_request = std::make_shared<ZooKeeperFilteredListRequest>();
int new_zxid = ++zxid;
list_request->path = parentPath(StringRef{path}).toString();
list_request->list_request_type = list_request_type;
storage.preprocessRequest(list_request, 1, 0, new_zxid);
auto responses = storage.processRequest(list_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
const auto & list_response = dynamic_cast<ZooKeeperListResponse &>(*responses[0].response);
return list_response.names;
};
const auto persistent_children = get_children(ListRequestType::PERSISTENT_ONLY);
EXPECT_EQ(persistent_children.size(), persistent_num);
for (const auto & child : persistent_children)
{
EXPECT_TRUE(expected_persistent_children.contains(child)) << "Missing persistent child " << child;
}
const auto ephemeral_children = get_children(ListRequestType::EPHEMERAL_ONLY);
EXPECT_EQ(ephemeral_children.size(), ephemeral_num);
for (const auto & child : ephemeral_children)
{
EXPECT_TRUE(expected_ephemeral_children.contains(child)) << "Missing ephemeral child " << child;
}
const auto all_children = get_children(ListRequestType::ALL);
EXPECT_EQ(all_children.size(), ephemeral_num + persistent_num);
for (const auto & child : all_children)
{
EXPECT_TRUE(expected_ephemeral_children.contains(child) || expected_persistent_children.contains(child)) << "Missing child " << child;
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{

View File

@ -62,7 +62,7 @@ void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & pa
promise->set_value();
};
zookeeper.list(path, list_callback, nullptr);
zookeeper.list(path, ListRequestType::ALL, list_callback, nullptr);
future.get();
while (!children.empty())