Better ACLs storage on disk (update snapshot version)

This commit is contained in:
alesapin 2021-05-28 14:52:19 +03:00
parent 4d59590b7f
commit a1efea20dc
17 changed files with 345 additions and 97 deletions

View File

@ -0,0 +1,96 @@
#include <Coordination/ACLMap.h>
#include <Common/SipHash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
size_t ACLMap::ACLsHash::operator()(const Coordination::ACLs & acls) const
{
SipHash hash;
for (const auto & acl : acls)
{
hash.update(acl.permissions);
hash.update(acl.scheme);
hash.update(acl.id);
}
return hash.get64();
}
bool ACLMap::ACLsComparator::operator()(const Coordination::ACLs & left, const Coordination::ACLs & right) const
{
if (left.size() != right.size())
return false;
for (size_t i = 0; i < left.size(); ++i)
{
if (left[i].permissions != right[i].permissions)
return false;
if (left[i].scheme != right[i].scheme)
return false;
if (left[i].id != right[i].id)
return false;
}
return true;
}
uint64_t ACLMap::convertACLs(const Coordination::ACLs & acls)
{
if (acl_to_num.count(acls))
return acl_to_num[acls];
/// Start from one
auto index = acl_to_num.size() + 1;
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "ALLOCATING {} FOR ACL", index);
acl_to_num[acls] = index;
num_to_acl[index] = acls;
return index;
}
Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
{
if (acls_id == 0)
return Coordination::ACLs{};
if (!num_to_acl.count(acls_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ACL id {}. It's a bug", acls_id);
return num_to_acl.at(acls_id);
}
void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
{
num_to_acl[acls_id] = acls;
acl_to_num[acls] = acls_id;
}
void ACLMap::addUsage(uint64_t acl_id)
{
usage_counter[acl_id]++;
}
void ACLMap::removeUsage(uint64_t acl_id)
{
if (usage_counter.count(acl_id) == 0)
return;
usage_counter[acl_id]--;
if (usage_counter[acl_id] == 0)
{
auto acls = num_to_acl[acl_id];
num_to_acl.erase(acl_id);
acl_to_num.erase(acls);
usage_counter.erase(acl_id);
}
}
}

54
src/Coordination/ACLMap.h Normal file
View File

@ -0,0 +1,54 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <unordered_map>
namespace DB
{
/// Simple mapping of different ACLs to sequentially growing numbers
/// Allows to store single number instead of vector of ACLs on disk and in memory.
class ACLMap
{
private:
struct ACLsHash
{
size_t operator()(const Coordination::ACLs & acls) const;
};
struct ACLsComparator
{
bool operator()(const Coordination::ACLs & left, const Coordination::ACLs & right) const;
};
using ACLToNumMap = std::unordered_map<Coordination::ACLs, uint64_t, ACLsHash, ACLsComparator>;
using NumToACLMap = std::unordered_map<uint64_t, Coordination::ACLs>;
using UsageCounter = std::unordered_map<uint64_t, uint64_t>;
ACLToNumMap acl_to_num;
NumToACLMap num_to_acl;
UsageCounter usage_counter;
public:
/// Convert ACL to number. If it's new ACL than adds it to map
/// with new id.
uint64_t convertACLs(const Coordination::ACLs & acls);
/// Convert number to ACL vector. If number is unknown for map
/// than throws LOGICAL ERROR
Coordination::ACLs convertNumber(uint64_t acls_id) const;
/// Mapping from numbers to ACLs vectors. Used during serialization.
const NumToACLMap & getMapping() const { return num_to_acl; }
/// Add mapping to ACLMap. Used during deserialization.
void addMapping(uint64_t acls_id, const Coordination::ACLs & acls);
/// Add/remove usage of some id. Used to remove unused ACLs.
void addUsage(uint64_t acl_id);
void removeUsage(uint64_t acl_id);
};
}

View File

@ -296,7 +296,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
break;
}
else if (changelog_description.from_log_index > start_to_read_from)
LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smalled available log index on disk {}.", start_to_read_from, changelog_description.from_log_index);
LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index);
}
started = true;

View File

@ -56,14 +56,7 @@ namespace
writeBinary(node.data, out);
/// Serialize ACL
writeBinary(node.acls.size(), out);
for (const auto & acl : node.acls)
{
writeBinary(acl.permissions, out);
writeBinary(acl.scheme, out);
writeBinary(acl.id, out);
}
writeBinary(node.acl_id, out);
writeBinary(node.is_sequental, out);
/// Serialize stat
writeBinary(node.stat.czxid, out);
@ -81,21 +74,33 @@ namespace
writeBinary(node.seq_num, out);
}
void readNode(KeeperStorage::Node & node, ReadBuffer & in)
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{
readBinary(node.data, in);
/// Deserialize ACL
size_t acls_size;
readBinary(acls_size, in);
for (size_t i = 0; i < acls_size; ++i)
if (version >= SnapshotVersion::V1)
{
Coordination::ACL acl;
readBinary(acl.permissions, in);
readBinary(acl.scheme, in);
readBinary(acl.id, in);
node.acls.push_back(acl);
readBinary(node.acl_id, in);
}
else if (version == SnapshotVersion::V0)
{
/// Deserialize ACL
size_t acls_size;
readBinary(acls_size, in);
Coordination::ACLs acls;
for (size_t i = 0; i < acls_size; ++i)
{
Coordination::ACL acl;
readBinary(acl.permissions, in);
readBinary(acl.scheme, in);
readBinary(acl.id, in);
acls.push_back(acl);
}
node.acl_id = acl_map.convertACLs(acls);
}
acl_map.addUsage(node.acl_id);
readBinary(node.is_sequental, in);
/// Deserialize stat
@ -137,6 +142,22 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
writeBinary(static_cast<uint8_t>(snapshot.version), out);
serializeSnapshotMetadata(snapshot.snapshot_meta, out);
writeBinary(snapshot.session_id, out);
/// Serialize ACLs MAP
writeBinary(snapshot.acl_map.size(), out);
for (const auto & [acl_id, acls] : snapshot.acl_map)
{
writeBinary(acl_id, out);
writeBinary(acls.size(), out);
for (const auto & acl : acls)
{
writeBinary(acl.permissions, out);
writeBinary(acl.scheme, out);
writeBinary(acl.id, out);
}
}
/// Serialize data tree
writeBinary(snapshot.snapshot_container_size, out);
size_t counter = 0;
for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++counter)
@ -157,12 +178,24 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
++it;
}
/// Serialize sessions
size_t size = snapshot.session_and_timeout.size();
writeBinary(size, out);
for (const auto & [session_id, timeout] : snapshot.session_and_timeout)
{
writeBinary(session_id, out);
writeBinary(timeout, out);
KeeperStorage::AuthIDs ids;
if (snapshot.session_and_auth.count(session_id))
ids = snapshot.session_and_auth.at(session_id);
writeBinary(ids.size(), out);
for (const auto & [scheme, id] : ids)
{
writeBinary(scheme, out);
writeBinary(id, out);
}
}
}
@ -170,15 +203,42 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
{
uint8_t version;
readBinary(version, in);
if (static_cast<SnapshotVersion>(version) > SnapshotVersion::V0)
SnapshotVersion current_version = static_cast<SnapshotVersion>(version);
if (current_version > SnapshotVersion::V1)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);
int64_t session_id;
readBinary(session_id, in);
storage.zxid = result->get_last_log_idx();
storage.session_id_counter = session_id;
if (current_version >= SnapshotVersion::V1)
{
size_t acls_map_size;
readBinary(acls_map_size, in);
size_t current_map_size = 0;
while (current_map_size < acls_map_size)
{
uint64_t acl_id;
readBinary(acl_id, in);
size_t acls_size;
readBinary(acls_size, in);
Coordination::ACLs acls;
for (size_t i = 0; i < acls_size; ++i)
{
Coordination::ACL acl;
readBinary(acl.permissions, in);
readBinary(acl.scheme, in);
readBinary(acl.id, in);
acls.push_back(acl);
}
storage.acl_map.addMapping(acl_id, acls);
current_map_size++;
}
}
size_t snapshot_container_size;
readBinary(snapshot_container_size, in);
@ -187,8 +247,8 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
{
std::string path;
readBinary(path, in);
KeeperStorage::Node node;
readNode(node, in);
KeeperStorage::Node node{};
readNode(node, in, current_version, storage.acl_map);
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
@ -215,6 +275,24 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
readBinary(active_session_id, in);
readBinary(timeout, in);
storage.addSessionID(active_session_id, timeout);
if (current_version >= SnapshotVersion::V1)
{
size_t session_auths_size;
readBinary(session_auths_size, in);
KeeperStorage::AuthIDs ids;
size_t session_auth_counter = 0;
while (session_auth_counter < session_auths_size)
{
String scheme, id;
readBinary(scheme, in);
readBinary(id, in);
ids.emplace_back(KeeperStorage::AuthID{scheme, id});
}
if (ids.size() > 0)
storage.session_and_auth[active_session_id] = ids;
}
current_session_size++;
}
@ -230,6 +308,8 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t
snapshot_container_size = storage->container.snapshotSize();
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
@ -241,6 +321,8 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const Sna
snapshot_container_size = storage->container.snapshotSize();
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
acl_map = storage->acl_map.getMapping();
session_and_auth = storage->session_and_auth;
}
KeeperStorageSnapshot::~KeeperStorageSnapshot()

View File

@ -13,6 +13,7 @@ using SnapshotMetadataPtr = std::shared_ptr<SnapshotMetadata>;
enum SnapshotVersion : uint8_t
{
V0 = 0,
V1 = 1, /// with ACL map
};
struct KeeperStorageSnapshot
@ -29,12 +30,14 @@ public:
KeeperStorage * storage;
SnapshotVersion version = SnapshotVersion::V0;
SnapshotVersion version = SnapshotVersion::V1;
SnapshotMetadataPtr snapshot_meta;
int64_t session_id;
size_t snapshot_container_size;
KeeperStorage::Container::const_iterator begin;
SessionAndTimeout session_and_timeout;
KeeperStorage::SessionAndAuth session_and_auth;
std::unordered_map<uint64_t, Coordination::ACLs> acl_map;
};
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;

View File

@ -226,7 +226,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
@ -267,12 +267,17 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
KeeperStorage::Node created_node;
if (!fixupACL(request.acls, session_auth_ids, created_node.acls))
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls))
{
response.error = Coordination::Error::ZINVALIDACL;
return {response_ptr, {}};
}
uint64_t acl_id = storage.acl_map.convertACLs(node_acls);
storage.acl_map.addUsage(acl_id);
created_node.acl_id = acl_id;
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
@ -312,13 +317,15 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (request.is_ephemeral)
ephemerals[session_id].emplace(path_created);
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path]
undo = [&storage, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
{
container.erase(path_created);
if (is_ephemeral)
ephemerals[session_id].erase(path_created);
storage.container.erase(path_created);
storage.acl_map.removeUsage(acl_id);
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent)
if (is_ephemeral)
storage.ephemerals[session_id].erase(path_created);
storage.container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent)
{
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
@ -345,7 +352,7 @@ struct KeeperStorageGetRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
@ -386,7 +393,7 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
@ -429,6 +436,8 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
ephemerals.erase(ephemerals_it);
}
storage.acl_map.removeUsage(prev_node.acl_id);
auto child_basename = getBaseName(it->key);
container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent)
{
@ -441,13 +450,15 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
container.erase(request.path);
undo = [prev_node, &container, &ephemerals, path = request.path, child_basename]
undo = [prev_node, &storage, path = request.path, child_basename]
{
if (prev_node.stat.ephemeralOwner != 0)
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
storage.ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
container.insert(path, prev_node);
container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent)
storage.acl_map.addUsage(prev_node.acl_id);
storage.container.insert(path, prev_node);
storage.container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent)
{
++parent.stat.numChildren;
--parent.stat.cversion;
@ -500,7 +511,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
@ -577,7 +588,7 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
@ -622,7 +633,7 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest
if (it == container.end())
return true;
const auto & node_acls = it->value.acls;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;

View File

@ -5,6 +5,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/ACLMap.h>
#include <Coordination/SnapshotableHashTable.h>
#include <unordered_map>
#include <unordered_set>
@ -30,7 +31,7 @@ public:
struct Node
{
String data;
Coordination::ACLs acls{};
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
@ -81,6 +82,7 @@ public:
SessionAndWatcher sessions_and_watchers;
SessionExpiryQueue session_expiry_queue;
SessionAndTimeout session_and_timeout;
ACLMap acl_map;
int64_t zxid{0};
bool finalized{false};

View File

@ -10,7 +10,7 @@
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<force_sync>false</force_sync>
<snapshot_distance>75</snapshot_distance>
</coordination_settings>
<raft_configuration>

View File

@ -6,7 +6,7 @@ from kazoo.security import ACL, make_digest_acl, make_acl
from kazoo.exceptions import AuthFailedError, InvalidACLError, NoAuthError, KazooException
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], with_zookeeper=True, use_keeper=False)
node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], with_zookeeper=True, use_keeper=False, stay_alive=True)
SUPERAUTH = "super:admin"
@ -246,3 +246,57 @@ def test_bad_auth(started_cluster):
with pytest.raises(InvalidACLError):
print("Sending 12")
auth_connection.create("/test_bad_acl", b"data", acl=[make_acl("digest", "dsad:DSAa:d", read=True, write=False, create=True, delete=True, admin=True)])
def test_auth_snapshot(started_cluster):
connection = get_fake_zk()
connection.add_auth('digest', 'user1:password1')
connection.create("/test_snapshot_acl", b"data", acl=[make_acl("auth", "", all=True)])
connection1 = get_fake_zk()
connection1.add_auth('digest', 'user2:password2')
connection1.create("/test_snapshot_acl1", b"data", acl=[make_acl("auth", "", all=True)])
connection2 = get_fake_zk()
connection2.create("/test_snapshot_acl2", b"data")
for i in range(100):
connection.create(f"/test_snapshot_acl/path{i}", b"data", acl=[make_acl("auth", "", all=True)])
node.restart_clickhouse()
connection = get_fake_zk()
with pytest.raises(NoAuthError):
connection.get("/test_snapshot_acl")
connection.add_auth('digest', 'user1:password1')
assert connection.get("/test_snapshot_acl")[0] == b"data"
with pytest.raises(NoAuthError):
connection.get("/test_snapshot_acl1")
assert connection.get("/test_snapshot_acl2")[0] == b"data"
for i in range(100):
assert connection.get(f"/test_snapshot_acl/path{i}")[0] == b"data"
connection1 = get_fake_zk()
connection1.add_auth('digest', 'user2:password2')
assert connection1.get("/test_snapshot_acl1")[0] == b"data"
with pytest.raises(NoAuthError):
connection1.get("/test_snapshot_acl")
connection2 = get_fake_zk()
assert connection2.get("/test_snapshot_acl2")[0] == b"data"
with pytest.raises(NoAuthError):
connection2.get("/test_snapshot_acl")
with pytest.raises(NoAuthError):
connection2.get("/test_snapshot_acl1")

View File

@ -26,13 +26,6 @@ def started_cluster():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -66,12 +66,6 @@ def wait_nodes():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -54,13 +54,6 @@ def wait_nodes():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -33,13 +33,6 @@ def started_cluster():
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -25,13 +25,6 @@ def started_cluster():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -25,13 +25,6 @@ def started_cluster():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance
@ -77,6 +70,7 @@ def test_recover_from_snapshot(started_cluster):
# stale node should recover from leader's snapshot
# with some sanitizers can start longer than 5 seconds
node3.start_clickhouse(20)
print("Restarted")
try:
node1_zk = node2_zk = node3_zk = None

View File

@ -35,13 +35,6 @@ def started_cluster():
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance

View File

@ -25,13 +25,6 @@ def started_cluster():
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance