Add clean up for faults injected before zk request

+ for multi-op request, it create responses in multi response
+ some polishing
This commit is contained in:
Igor Nikonov 2022-10-27 21:31:53 +00:00
parent f773436de5
commit a601c166ad
2 changed files with 69 additions and 27 deletions

View File

@ -45,7 +45,7 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
if (e != Coordination::Error::ZOK)
{
if (!responses.empty() && responses.front()->error == Coordination::Error::ZNODEEXISTS)
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),

View File

@ -1,9 +1,9 @@
#pragma once
#include <random>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/randomSeed.h>
namespace DB
@ -58,10 +58,9 @@ class ZooKeeperWithFaultInjection
double fault_injection_probability,
UInt64 fault_injection_seed,
std::string name_,
Poco::Logger * logger_ = nullptr)
Poco::Logger * logger_)
: keeper(keeper_), name(std::move(name_)), logger(logger_), seed(fault_injection_seed)
{
if (fault_injection_probability > .0)
fault_policy = std::make_unique<RandomFaultInjection>(fault_injection_probability, fault_injection_seed);
if (unlikely(logger))
@ -166,16 +165,23 @@ public:
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->tryMulti(requests, responses); },
[&](const Coordination::Error & err)
[&](const Coordination::Error & original_error)
{
if (err == Coordination::Error::ZOK)
faultInjectionCleanup(method, requests, responses);
if (original_error == Coordination::Error::ZOK)
faultInjectionPostAction(method, requests, responses);
},
[&]()
{
responses.clear();
for (size_t i = 0; i < requests.size(); ++i)
responses.emplace_back(std::make_shared<Coordination::ZooKeeperErrorResponse>());
});
/// collect ephemeral nodes to clean up
/// collect ephemeral nodes when no fault was injected (to clean up on demand)
if (unlikely(fault_policy) && Coordination::Error::ZOK == error)
{
doForEachEphemeralNode(
doForEachCreatedEphemeralNode(
method, requests, responses, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return error;
@ -190,16 +196,22 @@ public:
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->tryMultiNoThrow(requests, responses); },
[&](const Coordination::Error & err)
[&](const Coordination::Error & original_error)
{
if (err == Coordination::Error::ZOK)
faultInjectionCleanup(method, requests, responses);
if (original_error == Coordination::Error::ZOK)
faultInjectionPostAction(method, requests, responses);
},
[&]()
{
responses.clear();
for (size_t i = 0; i < requests.size(); ++i)
responses.emplace_back(std::make_shared<Coordination::ZooKeeperErrorResponse>());
});
/// collect ephemeral nodes to clean up
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy) && Coordination::Error::ZOK == error)
{
doForEachEphemeralNode(
doForEachCreatedEphemeralNode(
method, requests, responses, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return error;
@ -256,7 +268,7 @@ public:
}
});
/// collect ephemeral nodes to clean up
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy))
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
@ -268,11 +280,20 @@ public:
Coordination::Responses multi(const Coordination::Requests & requests)
{
return access(
"multi",
constexpr auto method = "multi";
auto result = access(
method,
!requests.empty() ? requests.front()->getPath() : "",
[&]() { return keeper->multi(requests); },
[&](Coordination::Responses const & responses) { faultInjectionCleanup("multi", requests, responses); });
[&](Coordination::Responses & responses) { faultInjectionPostAction(method, requests, responses); });
/// collect ephemeral nodes to clean up
if (unlikely(fault_policy))
{
doForEachCreatedEphemeralNode(
method, requests, result, [&](const String & path_created) { ephemeral_nodes.push_back(path_created); });
}
return result;
}
void createAncestors(const std::string & path)
@ -305,6 +326,19 @@ public:
}
private:
void faultInjectionBefore(std::function<void()> fault_cleanup)
{
try
{
if (unlikely(fault_policy))
fault_policy->beforeOperation();
}
catch (const zkutil::KeeperException &)
{
fault_cleanup();
throw;
}
}
void faultInjectionAfter(std::function<void()> fault_cleanup)
{
try
@ -319,7 +353,7 @@ private:
}
}
void doForEachEphemeralNode(
void doForEachCreatedEphemeralNode(
const char * method, const Coordination::Requests & requests, const Coordination::Responses & responses, auto && action)
{
if (responses.empty())
@ -353,15 +387,15 @@ private:
}
}
void faultInjectionCleanup(const char * method, const Coordination::Requests & requests, const Coordination::Responses & responses)
void faultInjectionPostAction(const char * method, const Coordination::Requests & requests, Coordination::Responses & responses)
{
doForEachEphemeralNode(method, requests, responses, [&](const String & path_created) { keeper->remove(path_created); });
doForEachCreatedEphemeralNode(method, requests, responses, [&](const String & path_created) { keeper->remove(path_created); });
}
template <typename T>
struct FaultCleanupTypeImpl
{
using Type = std::function<void(const T &)>;
using Type = std::function<void(T &)>;
};
template <>
@ -379,8 +413,12 @@ private:
int inject_failure_after_op = true,
typename Operation,
typename Result = std::invoke_result_t<Operation>>
Result
access(const char * func_name, const std::string & path, Operation operation, FaultCleanupType<Result> fault_after_op_cleanup = {})
Result access(
const char * func_name,
const std::string & path,
Operation operation,
FaultCleanupType<Result> fault_after_op_cleanup = {},
FaultCleanupType<void> fault_before_op_cleanup = {})
{
try
{
@ -392,8 +430,12 @@ private:
if constexpr (inject_failure_before_op)
{
if (unlikely(fault_policy))
fault_policy->beforeOperation();
faultInjectionBefore(
[&]
{
if (fault_before_op_cleanup)
fault_before_op_cleanup();
});
}
if constexpr (!std::is_same_v<Result, void>)