diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index 73c7da25a8b..79f9943cb57 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -281,6 +281,13 @@ struct SetResponse : virtual Response size_t bytesSize() const override { return sizeof(stat); } }; +enum class ListRequestType : uint8_t +{ + ALL, + PERSISTENT_ONLY, + EPHEMERAL_ONLY +}; + struct ListRequest : virtual Request { String path; @@ -492,6 +499,7 @@ public: virtual void list( const String & path, + ListRequestType list_request_type, ListCallback callback, WatchCallback watch) = 0; diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 3d2d5fcb667..3af5dfcc177 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -1,3 +1,4 @@ +#include "Common/ZooKeeper/IKeeper.h" #include #include #include @@ -119,12 +120,17 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest } }; -struct TestKeeperListRequest final : ListRequest, TestKeeperRequest +struct TestKeeperListRequest : ListRequest, TestKeeperRequest { ResponsePtr createResponse() const override; std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; +struct TestKeeperFilteredListRequest final : TestKeeperListRequest +{ + ListRequestType list_request_type; +}; + struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest { TestKeeperCheckRequest() = default; @@ -390,8 +396,18 @@ std::pair TestKeeperListRequest::process(TestKeeper::Containe child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it) { + using enum ListRequestType; if (parentPath(child_it->first) == path) - response.names.emplace_back(baseName(child_it->first)); + { + ListRequestType list_request_type = ALL; + if (const auto * filtered_list = dynamic_cast(this)) + list_request_type = filtered_list->list_request_type; + + const auto is_ephemeral = child_it->second.stat.ephemeralOwner != 0; + if (list_request_type == ALL || (is_ephemeral && list_request_type == EPHEMERAL_ONLY) + || (!is_ephemeral && list_request_type == PERSISTENT_ONLY)) + response.names.emplace_back(baseName(child_it->first)); + } } response.stat = it->second.stat; @@ -768,11 +784,13 @@ void TestKeeper::set( void TestKeeper::list( const String & path, + ListRequestType list_request_type, ListCallback callback, WatchCallback watch) { - TestKeeperListRequest request; + TestKeeperFilteredListRequest request; request.path = path; + request.list_request_type = list_request_type; RequestInfo request_info; request_info.request = std::make_shared(std::move(request)); diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 40cac3094f1..6e77a5d38c1 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -71,6 +71,7 @@ public: void list( const String & path, + ListRequestType list_request_type, ListCallback callback, WatchCallback watch) override; diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index c8ae6b72c3e..5a0be0f76ff 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -9,6 +9,7 @@ #include #include #include +#include "Common/ZooKeeper/IKeeper.h" #include #include #include @@ -312,9 +313,10 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch) Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res, Coordination::Stat * stat, - Coordination::WatchCallback watch_callback) + Coordination::WatchCallback watch_callback, + Coordination::ListRequestType list_request_type) { - auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback); + auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type); if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { @@ -335,26 +337,28 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings } } -Strings ZooKeeper::getChildren( - const std::string & path, Coordination::Stat * stat, const EventPtr & watch) +Strings ZooKeeper::getChildren(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) { Strings res; check(tryGetChildren(path, res, stat, watch), path); return res; } -Strings ZooKeeper::getChildrenWatch( - const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) +Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) { Strings res; check(tryGetChildrenWatch(path, res, stat, watch_callback), path); return res; } -Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings & res, - Coordination::Stat * stat, const EventPtr & watch) +Coordination::Error ZooKeeper::tryGetChildren( + const std::string & path, + Strings & res, + Coordination::Stat * stat, + const EventPtr & watch, + Coordination::ListRequestType list_request_type) { - Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch)); + Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type); if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)) throw KeeperException(code, path); @@ -362,10 +366,14 @@ Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings return code; } -Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res, - Coordination::Stat * stat, Coordination::WatchCallback watch_callback) +Coordination::Error ZooKeeper::tryGetChildrenWatch( + const std::string & path, + Strings & res, + Coordination::Stat * stat, + Coordination::WatchCallback watch_callback, + Coordination::ListRequestType list_request_type) { - Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback); + Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type); if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)) throw KeeperException(code, path); @@ -1046,7 +1054,8 @@ std::future ZooKeeper::asyncTrySetNoThrow(const std:: return future; } -std::future ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback) +std::future ZooKeeper::asyncGetChildren( + const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type) { auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -1059,11 +1068,12 @@ std::future ZooKeeper::asyncGetChildren(const std::s promise->set_value(response); }; - impl->list(path, std::move(callback), watch_callback); + impl->list(path, list_request_type, std::move(callback), watch_callback); return future; } -std::future ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback) +std::future ZooKeeper::asyncTryGetChildrenNoThrow( + const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type) { auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -1073,7 +1083,7 @@ std::future ZooKeeper::asyncTryGetChildrenNoThrow(co promise->set_value(response); }; - impl->list(path, std::move(callback), watch_callback); + impl->list(path, list_request_type, std::move(callback), watch_callback); return future; } diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 6aebccd2b4e..d2f92b6b4c3 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -194,11 +194,13 @@ public: /// * The node doesn't exist. Coordination::Error tryGetChildren(const std::string & path, Strings & res, Coordination::Stat * stat = nullptr, - const EventPtr & watch = nullptr); + const EventPtr & watch = nullptr, + Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL); Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res, Coordination::Stat * stat, - Coordination::WatchCallback watch_callback); + Coordination::WatchCallback watch_callback, + Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL); /// Performs several operations in a transaction. /// Throws on every error. @@ -279,9 +281,15 @@ public: FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {}); using FutureGetChildren = std::future; - FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {}); + FutureGetChildren asyncGetChildren( + const std::string & path, + Coordination::WatchCallback watch_callback = {}, + Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL); /// Like the previous one but don't throw any exceptions on future.get() - FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {}); + FutureGetChildren asyncTryGetChildrenNoThrow( + const std::string & path, + Coordination::WatchCallback watch_callback = {}, + Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL); using FutureSet = std::future; FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1); @@ -335,7 +343,11 @@ private: const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat); Coordination::Error getChildrenImpl( - const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); + const std::string & path, + Strings & res, + Coordination::Stat * stat, + Coordination::WatchCallback watch_callback, + Coordination::ListRequestType list_request_type); Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback); Coordination::Error syncImpl(const std::string & path, std::string & returned_path); diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index de2fb630848..837ea5bbad8 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -1,3 +1,4 @@ +#include "Common/ZooKeeper/IKeeper.h" #include #include #include @@ -298,6 +299,32 @@ std::string ZooKeeperListRequest::toStringImpl() const return fmt::format("path = {}", path); } +void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(has_watch, out); + Coordination::write(static_cast(list_request_type), out); +} + +void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(has_watch, in); + + uint8_t read_request_type{0}; + Coordination::read(read_request_type, in); + list_request_type = static_cast(read_request_type); +} + +std::string ZooKeeperFilteredListRequest::toStringImpl() const +{ + return fmt::format( + "path = {}\n" + "list_request_type = {}", + path, + list_request_type); +} + void ZooKeeperListResponse::readImpl(ReadBuffer & in) { Coordination::read(names, in); diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index c7bfbe95b74..09f797fb47b 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -347,6 +347,18 @@ struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest OpNum getOpNum() const override { return OpNum::SimpleList; } }; +struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest +{ + ListRequestType list_request_type{ListRequestType::ALL}; + + OpNum getOpNum() const override { return OpNum::FilteredList; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; + + size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); } +}; + struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse { void readImpl(ReadBuffer & in) override; diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index b0a05fe6c8d..5b121ed6138 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -64,6 +64,8 @@ std::string toString(OpNum op_num) return "SetACL"; case OpNum::GetACL: return "GetACL"; + case OpNum::FilteredList: + return "FilteredList"; } int32_t raw_op = static_cast(op_num); throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED); diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index 1ed2c442f6c..44f8437f12c 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -32,6 +32,10 @@ enum class OpNum : int32_t Check = 13, Multi = 14, Auth = 100, + + // CH Keeper specific operations + FilteredList = 500, + SessionID = 997, /// Special internal request }; diff --git a/src/Common/ZooKeeper/ZooKeeperIO.cpp b/src/Common/ZooKeeper/ZooKeeperIO.cpp index 066aa1a24f6..c84a8624d78 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.cpp +++ b/src/Common/ZooKeeper/ZooKeeperIO.cpp @@ -28,6 +28,11 @@ void write(int32_t x, WriteBuffer & out) writeBinary(x, out); } +void write(uint8_t x, WriteBuffer & out) +{ + writeBinary(x, out); +} + void write(OpNum x, WriteBuffer & out) { write(static_cast(x), out); @@ -91,6 +96,11 @@ void read(int64_t & x, ReadBuffer & in) x = __builtin_bswap64(x); } +void read(uint8_t & x, ReadBuffer & in) +{ + readBinary(x, in); +} + void read(int32_t & x, ReadBuffer & in) { readBinary(x, in); diff --git a/src/Common/ZooKeeper/ZooKeeperIO.h b/src/Common/ZooKeeper/ZooKeeperIO.h index c2c6149cd11..ec77b46f3d9 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.h +++ b/src/Common/ZooKeeper/ZooKeeperIO.h @@ -22,6 +22,7 @@ void write(uint64_t x, WriteBuffer & out); void write(int64_t x, WriteBuffer & out); void write(int32_t x, WriteBuffer & out); +void write(uint8_t x, WriteBuffer & out); void write(OpNum x, WriteBuffer & out); void write(bool x, WriteBuffer & out); void write(const std::string & s, WriteBuffer & out); @@ -50,6 +51,7 @@ void read(uint64_t & x, ReadBuffer & in); #endif void read(int64_t & x, ReadBuffer & in); void read(int32_t & x, ReadBuffer & in); +void read(uint8_t & x, ReadBuffer & in); void read(OpNum & x, ReadBuffer & in); void read(bool & x, ReadBuffer & in); void read(int8_t & x, ReadBuffer & in); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index bd284ed0c91..8fa6f28c29c 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1168,11 +1168,13 @@ void ZooKeeper::set( void ZooKeeper::list( const String & path, + ListRequestType list_request_type, ListCallback callback, WatchCallback watch) { - ZooKeeperListRequest request; + ZooKeeperFilteredListRequest request; request.path = path; + request.list_request_type = list_request_type; RequestInfo request_info; request_info.request = std::make_shared(std::move(request)); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index c4acaf8d1ee..aa27b0eefe9 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -164,6 +164,7 @@ public: void list( const String & path, + ListRequestType list_request_type, ListCallback callback, WatchCallback watch) override; diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 21265f0bd61..fd1fab5b6b0 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "Common/ZooKeeper/ZooKeeperCommon.h" #include #include #include @@ -19,6 +20,7 @@ #include #include #include +#include namespace DB { @@ -1161,6 +1163,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc } auto & container = storage.container; + auto node_it = container.find(request.path); if (node_it == container.end()) { @@ -1178,8 +1181,31 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc const auto & children = node_it->value.getChildren(); response.names.reserve(children.size()); + const auto add_child = [&](const auto child) + { + using enum Coordination::ListRequestType; + + auto list_request_type = ALL; + if (auto * filtered_list = dynamic_cast(&request)) + list_request_type = filtered_list->list_request_type; + + if (list_request_type == ALL) + return true; + + auto child_path = (std::filesystem::path(request.path) / child.toView()).generic_string(); + auto child_it = container.find(child_path); + if (child_it == container.end()) + onStorageInconsistency(); + + const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0; + return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY); + }; + for (const auto child : children) - response.names.push_back(child.toString()); + { + if (add_child(child)) + response.names.push_back(child.toString()); + } response.stat = node_it->value.stat; response.error = Coordination::Error::ZOK; @@ -1623,7 +1649,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc void KeeperStorage::finalize() { if (finalized) - throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR); + throw DB::Exception("KeeperStorage already finalized", ErrorCodes::LOGICAL_ERROR); finalized = true; @@ -1689,6 +1715,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory() registerKeeperRequestProcessor(*this); registerKeeperRequestProcessor(*this); registerKeeperRequestProcessor(*this); + registerKeeperRequestProcessor(*this); registerKeeperRequestProcessor(*this); registerKeeperRequestProcessor(*this); registerKeeperRequestProcessor(*this); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index ee75f2a0860..bd0d329ef8d 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -1956,6 +1957,84 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud) ASSERT_FALSE(get_committed_data()); } +TEST_P(CoordinationTest, TestListRequestTypes) +{ + using namespace DB; + using namespace Coordination; + + KeeperStorage storage{500, "", true}; + + int64_t zxid = 0; + + static constexpr std::string_view path = "/test"; + + const auto create_path = [&](bool is_ephemeral) + { + const auto create_request = std::make_shared(); + int new_zxid = ++zxid; + create_request->path = path; + create_request->is_sequential = true; + create_request->is_ephemeral = is_ephemeral; + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_GE(responses.size(), 1); + const auto & create_response = dynamic_cast(*responses[0].response); + return create_response.path_created; + }; + + static constexpr size_t persistent_num = 5; + std::unordered_set expected_persistent_children; + for (size_t i = 0; i < persistent_num; ++i) + { + expected_persistent_children.insert(getBaseName(create_path(false)).toString()); + } + ASSERT_EQ(expected_persistent_children.size(), persistent_num); + + static constexpr size_t ephemeral_num = 5; + std::unordered_set expected_ephemeral_children; + for (size_t i = 0; i < ephemeral_num; ++i) + { + expected_ephemeral_children.insert(getBaseName(create_path(true)).toString()); + } + ASSERT_EQ(expected_ephemeral_children.size(), ephemeral_num); + + const auto get_children = [&](const auto list_request_type) + { + const auto list_request = std::make_shared(); + int new_zxid = ++zxid; + list_request->path = parentPath(StringRef{path}).toString(); + list_request->list_request_type = list_request_type; + storage.preprocessRequest(list_request, 1, 0, new_zxid); + auto responses = storage.processRequest(list_request, 1, new_zxid); + + EXPECT_GE(responses.size(), 1); + const auto & list_response = dynamic_cast(*responses[0].response); + return list_response.names; + }; + + const auto persistent_children = get_children(ListRequestType::PERSISTENT_ONLY); + EXPECT_EQ(persistent_children.size(), persistent_num); + for (const auto & child : persistent_children) + { + EXPECT_TRUE(expected_persistent_children.contains(child)) << "Missing persistent child " << child; + } + + const auto ephemeral_children = get_children(ListRequestType::EPHEMERAL_ONLY); + EXPECT_EQ(ephemeral_children.size(), ephemeral_num); + for (const auto & child : ephemeral_children) + { + EXPECT_TRUE(expected_ephemeral_children.contains(child)) << "Missing ephemeral child " << child; + } + + const auto all_children = get_children(ListRequestType::ALL); + EXPECT_EQ(all_children.size(), ephemeral_num + persistent_num); + for (const auto & child : all_children) + { + EXPECT_TRUE(expected_ephemeral_children.contains(child) || expected_persistent_children.contains(child)) << "Missing child " << child; + } +} + INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, CoordinationTest, ::testing::ValuesIn(std::initializer_list{ diff --git a/utils/keeper-bench/Generator.cpp b/utils/keeper-bench/Generator.cpp index d3a8323b81f..5d1d0f8a491 100644 --- a/utils/keeper-bench/Generator.cpp +++ b/utils/keeper-bench/Generator.cpp @@ -62,7 +62,7 @@ void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & pa promise->set_value(); }; - zookeeper.list(path, list_callback, nullptr); + zookeeper.list(path, ListRequestType::ALL, list_callback, nullptr); future.get(); while (!children.empty())