From b6654138693e1bcc0e61afb801ac0f05298f9bfc Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 15 Jul 2020 01:53:34 +0300 Subject: [PATCH] Improve performance of TestKeeper --- src/Common/ZooKeeper/TestKeeper.cpp | 114 +++++++++++++++++++--------- 1 file changed, 79 insertions(+), 35 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index a734d218ff6..6e408750099 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace Coordination @@ -25,11 +26,14 @@ static String baseName(const String & path) } +using Undo = std::function; + + struct TestKeeperRequest : virtual Request { virtual bool isMutable() const { return false; } virtual ResponsePtr createResponse() const = 0; - virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0; + virtual std::pair process(TestKeeper::Container & container, int64_t zxid) const = 0; virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {} }; @@ -69,7 +73,7 @@ struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest TestKeeperCreateRequest() = default; explicit TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {} ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override { @@ -83,7 +87,7 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest explicit TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} bool isMutable() const override { return true; } ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override { @@ -94,14 +98,14 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest { ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest { TestKeeperGetRequest() = default; ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest @@ -110,7 +114,7 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest explicit TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {} bool isMutable() const override { return true; } ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override { @@ -121,7 +125,7 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest struct TestKeeperListRequest final : ListRequest, TestKeeperRequest { ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest @@ -129,7 +133,7 @@ struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest TestKeeperCheckRequest() = default; explicit TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {} ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest @@ -169,13 +173,15 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest } ResponsePtr createResponse() const override; - ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; -ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const +std::pair TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const { CreateResponse response; + Undo undo; + if (container.count(path)) { response.error = Error::ZNODEEXISTS; @@ -219,7 +225,18 @@ ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, } response.path_created = path_created; - container.emplace(std::move(path_created), std::move(created_node)); + container.emplace(path_created, std::move(created_node)); + + undo = [&container, path_created, is_sequential = is_sequential, parent_path = it->first] + { + container.erase(path_created); + auto & undo_parent = container.at(parent_path); + --undo_parent.stat.cversion; + --undo_parent.stat.numChildren; + + if (is_sequential) + --undo_parent.seq_num; + }; ++it->second.stat.cversion; ++it->second.stat.numChildren; @@ -228,12 +245,13 @@ ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, } } - return std::make_shared(response); + return { std::make_shared(response), undo }; } -ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const +std::pair TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const { RemoveResponse response; + Undo undo; auto it = container.find(path); if (it == container.end()) @@ -250,17 +268,26 @@ ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, } else { + auto prev_node = it->second; container.erase(it); auto & parent = container.at(parentPath(path)); --parent.stat.numChildren; ++parent.stat.cversion; response.error = Error::ZOK; + + undo = [prev_node, &container, path = path] + { + container.emplace(path, prev_node); + auto & undo_parent = container.at(parentPath(path)); + ++undo_parent.stat.numChildren; + --undo_parent.stat.cversion; + }; } - return std::make_shared(response); + return { std::make_shared(response), undo }; } -ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const +std::pair TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const { ExistsResponse response; @@ -275,10 +302,10 @@ ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, response.error = Error::ZNONODE; } - return std::make_shared(response); + return { std::make_shared(response), {} }; } -ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const +std::pair TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const { GetResponse response; @@ -294,12 +321,13 @@ ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int response.error = Error::ZOK; } - return std::make_shared(response); + return { std::make_shared(response), {} }; } -ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const +std::pair TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const { SetResponse response; + Undo undo; auto it = container.find(path); if (it == container.end()) @@ -308,6 +336,8 @@ ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int } else if (version == -1 || version == it->second.stat.version) { + auto prev_node = it->second; + it->second.data = data; ++it->second.stat.version; it->second.stat.mzxid = zxid; @@ -316,16 +346,22 @@ ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int ++container.at(parentPath(path)).stat.cversion; response.stat = it->second.stat; response.error = Error::ZOK; + + undo = [prev_node, &container, path = path] + { + container.at(path) = prev_node; + --container.at(parentPath(path)).stat.cversion; + }; } else { response.error = Error::ZBADVERSION; } - return std::make_shared(response); + return { std::make_shared(response), undo }; } -ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const +std::pair TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const { ListResponse response; @@ -344,18 +380,22 @@ ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, in path_prefix += '/'; /// Fairly inefficient. - for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it) + for (auto child_it = container.upper_bound(path_prefix); + child_it != container.end() && startsWith(child_it->first, path_prefix); + ++child_it) + { if (parentPath(child_it->first) == path) response.names.emplace_back(baseName(child_it->first)); + } response.stat = it->second.stat; response.error = Error::ZOK; } - return std::make_shared(response); + return { std::make_shared(response), {} }; } -ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const +std::pair TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const { CheckResponse response; auto it = container.find(path); @@ -372,38 +412,42 @@ ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, i response.error = Error::ZOK; } - return std::make_shared(response); + return { std::make_shared(response), {} }; } -ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const +std::pair TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const { MultiResponse response; response.responses.reserve(requests.size()); - - /// Fairly inefficient. - auto container_copy = container; + std::vector undo_actions; try { for (const auto & request : requests) { const TestKeeperRequest & concrete_request = dynamic_cast(*request); - auto cur_response = concrete_request.process(container, zxid); + auto [ cur_response, undo_action ] = concrete_request.process(container, zxid); response.responses.emplace_back(cur_response); if (cur_response->error != Error::ZOK) { response.error = cur_response->error; - container = container_copy; - return std::make_shared(response); + + for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it) + (*it)(); + + return { std::make_shared(response), {} }; } + else + undo_actions.emplace_back(std::move(undo_action)); } response.error = Error::ZOK; - return std::make_shared(response); + return { std::make_shared(response), {} }; } catch (...) { - container = container_copy; + for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it) + (*it)(); throw; } } @@ -476,7 +520,7 @@ void TestKeeper::processingThread() ++zxid; info.request->addRootPath(root_path); - ResponsePtr response = info.request->process(container, zxid); + auto [response, _] = info.request->process(container, zxid); if (response->error == Error::ZOK) info.request->processWatches(watches, list_watches);