Improve performance of TestKeeper

This commit is contained in:
Alexey Milovidov 2020-07-15 01:53:34 +03:00
parent ffe2cecb1b
commit b665413869

View File

@ -5,6 +5,7 @@
#include <sstream>
#include <iomanip>
#include <functional>
namespace Coordination
@ -25,11 +26,14 @@ static String baseName(const String & path)
}
using Undo = std::function<void()>;
struct TestKeeperRequest : virtual Request
{
virtual bool isMutable() const { return false; }
virtual ResponsePtr createResponse() const = 0;
virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0;
virtual std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const = 0;
virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {}
};
@ -69,7 +73,7 @@ struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest
TestKeeperCreateRequest() = default;
explicit TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
@ -83,7 +87,7 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
explicit TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
bool isMutable() const override { return true; }
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
@ -94,14 +98,14 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
{
TestKeeperGetRequest() = default;
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
@ -110,7 +114,7 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
explicit TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {}
bool isMutable() const override { return true; }
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
@ -121,7 +125,7 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
@ -129,7 +133,7 @@ struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
TestKeeperCheckRequest() = default;
explicit TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
@ -169,13 +173,15 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
CreateResponse response;
Undo undo;
if (container.count(path))
{
response.error = Error::ZNODEEXISTS;
@ -219,7 +225,18 @@ ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container,
}
response.path_created = path_created;
container.emplace(std::move(path_created), std::move(created_node));
container.emplace(path_created, std::move(created_node));
undo = [&container, path_created, is_sequential = is_sequential, parent_path = it->first]
{
container.erase(path_created);
auto & undo_parent = container.at(parent_path);
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
if (is_sequential)
--undo_parent.seq_num;
};
++it->second.stat.cversion;
++it->second.stat.numChildren;
@ -228,12 +245,13 @@ ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container,
}
}
return std::make_shared<CreateResponse>(response);
return { std::make_shared<CreateResponse>(response), undo };
}
ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
{
RemoveResponse response;
Undo undo;
auto it = container.find(path);
if (it == container.end())
@ -250,17 +268,26 @@ ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container,
}
else
{
auto prev_node = it->second;
container.erase(it);
auto & parent = container.at(parentPath(path));
--parent.stat.numChildren;
++parent.stat.cversion;
response.error = Error::ZOK;
undo = [prev_node, &container, path = path]
{
container.emplace(path, prev_node);
auto & undo_parent = container.at(parentPath(path));
++undo_parent.stat.numChildren;
--undo_parent.stat.cversion;
};
}
return std::make_shared<RemoveResponse>(response);
return { std::make_shared<RemoveResponse>(response), undo };
}
ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
{
ExistsResponse response;
@ -275,10 +302,10 @@ ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container,
response.error = Error::ZNONODE;
}
return std::make_shared<ExistsResponse>(response);
return { std::make_shared<ExistsResponse>(response), {} };
}
ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
{
GetResponse response;
@ -294,12 +321,13 @@ ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int
response.error = Error::ZOK;
}
return std::make_shared<GetResponse>(response);
return { std::make_shared<GetResponse>(response), {} };
}
ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
std::pair<ResponsePtr, Undo> TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
SetResponse response;
Undo undo;
auto it = container.find(path);
if (it == container.end())
@ -308,6 +336,8 @@ ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int
}
else if (version == -1 || version == it->second.stat.version)
{
auto prev_node = it->second;
it->second.data = data;
++it->second.stat.version;
it->second.stat.mzxid = zxid;
@ -316,16 +346,22 @@ ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int
++container.at(parentPath(path)).stat.cversion;
response.stat = it->second.stat;
response.error = Error::ZOK;
undo = [prev_node, &container, path = path]
{
container.at(path) = prev_node;
--container.at(parentPath(path)).stat.cversion;
};
}
else
{
response.error = Error::ZBADVERSION;
}
return std::make_shared<SetResponse>(response);
return { std::make_shared<SetResponse>(response), undo };
}
ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
{
ListResponse response;
@ -344,18 +380,22 @@ ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, in
path_prefix += '/';
/// Fairly inefficient.
for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it)
for (auto child_it = container.upper_bound(path_prefix);
child_it != container.end() && startsWith(child_it->first, path_prefix);
++child_it)
{
if (parentPath(child_it->first) == path)
response.names.emplace_back(baseName(child_it->first));
}
response.stat = it->second.stat;
response.error = Error::ZOK;
}
return std::make_shared<ListResponse>(response);
return { std::make_shared<ListResponse>(response), {} };
}
ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
std::pair<ResponsePtr, Undo> TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
{
CheckResponse response;
auto it = container.find(path);
@ -372,38 +412,42 @@ ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, i
response.error = Error::ZOK;
}
return std::make_shared<CheckResponse>(response);
return { std::make_shared<CheckResponse>(response), {} };
}
ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
MultiResponse response;
response.responses.reserve(requests.size());
/// Fairly inefficient.
auto container_copy = container;
std::vector<Undo> undo_actions;
try
{
for (const auto & request : requests)
{
const TestKeeperRequest & concrete_request = dynamic_cast<const TestKeeperRequest &>(*request);
auto cur_response = concrete_request.process(container, zxid);
auto [ cur_response, undo_action ] = concrete_request.process(container, zxid);
response.responses.emplace_back(cur_response);
if (cur_response->error != Error::ZOK)
{
response.error = cur_response->error;
container = container_copy;
return std::make_shared<MultiResponse>(response);
for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it)
(*it)();
return { std::make_shared<MultiResponse>(response), {} };
}
else
undo_actions.emplace_back(std::move(undo_action));
}
response.error = Error::ZOK;
return std::make_shared<MultiResponse>(response);
return { std::make_shared<MultiResponse>(response), {} };
}
catch (...)
{
container = container_copy;
for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it)
(*it)();
throw;
}
}
@ -476,7 +520,7 @@ void TestKeeper::processingThread()
++zxid;
info.request->addRootPath(root_path);
ResponsePtr response = info.request->process(container, zxid);
auto [response, _] = info.request->process(container, zxid);
if (response->error == Error::ZOK)
info.request->processWatches(watches, list_watches);