diff --git a/src/Coordination/ACLMap.cpp b/src/Coordination/ACLMap.cpp new file mode 100644 index 00000000000..4d3ec4fb3bc --- /dev/null +++ b/src/Coordination/ACLMap.cpp @@ -0,0 +1,96 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +size_t ACLMap::ACLsHash::operator()(const Coordination::ACLs & acls) const +{ + SipHash hash; + for (const auto & acl : acls) + { + hash.update(acl.permissions); + hash.update(acl.scheme); + hash.update(acl.id); + } + return hash.get64(); +} + +bool ACLMap::ACLsComparator::operator()(const Coordination::ACLs & left, const Coordination::ACLs & right) const +{ + if (left.size() != right.size()) + return false; + + for (size_t i = 0; i < left.size(); ++i) + { + if (left[i].permissions != right[i].permissions) + return false; + + if (left[i].scheme != right[i].scheme) + return false; + + if (left[i].id != right[i].id) + return false; + } + return true; +} + +uint64_t ACLMap::convertACLs(const Coordination::ACLs & acls) +{ + if (acl_to_num.count(acls)) + return acl_to_num[acls]; + + /// Start from one + auto index = acl_to_num.size() + 1; + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "ALLOCATING {} FOR ACL", index); + + acl_to_num[acls] = index; + num_to_acl[index] = acls; + + return index; +} + +Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const +{ + if (acls_id == 0) + return Coordination::ACLs{}; + + if (!num_to_acl.count(acls_id)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ACL id {}. It's a bug", acls_id); + + return num_to_acl.at(acls_id); +} + +void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls) +{ + num_to_acl[acls_id] = acls; + acl_to_num[acls] = acls_id; +} + +void ACLMap::addUsage(uint64_t acl_id) +{ + usage_counter[acl_id]++; +} + +void ACLMap::removeUsage(uint64_t acl_id) +{ + if (usage_counter.count(acl_id) == 0) + return; + + usage_counter[acl_id]--; + + if (usage_counter[acl_id] == 0) + { + auto acls = num_to_acl[acl_id]; + num_to_acl.erase(acl_id); + acl_to_num.erase(acls); + usage_counter.erase(acl_id); + } +} + +} diff --git a/src/Coordination/ACLMap.h b/src/Coordination/ACLMap.h new file mode 100644 index 00000000000..2313b3e7cd3 --- /dev/null +++ b/src/Coordination/ACLMap.h @@ -0,0 +1,54 @@ +#pragma once +#include +#include +#include + + +namespace DB +{ + +/// Simple mapping of different ACLs to sequentially growing numbers +/// Allows to store single number instead of vector of ACLs on disk and in memory. +class ACLMap +{ +private: + struct ACLsHash + { + size_t operator()(const Coordination::ACLs & acls) const; + }; + + struct ACLsComparator + { + bool operator()(const Coordination::ACLs & left, const Coordination::ACLs & right) const; + }; + + using ACLToNumMap = std::unordered_map; + + using NumToACLMap = std::unordered_map; + + using UsageCounter = std::unordered_map; + + ACLToNumMap acl_to_num; + NumToACLMap num_to_acl; + UsageCounter usage_counter; +public: + + /// Convert ACL to number. If it's new ACL than adds it to map + /// with new id. + uint64_t convertACLs(const Coordination::ACLs & acls); + + /// Convert number to ACL vector. If number is unknown for map + /// than throws LOGICAL ERROR + Coordination::ACLs convertNumber(uint64_t acls_id) const; + /// Mapping from numbers to ACLs vectors. Used during serialization. + const NumToACLMap & getMapping() const { return num_to_acl; } + + /// Add mapping to ACLMap. Used during deserialization. + void addMapping(uint64_t acls_id, const Coordination::ACLs & acls); + + /// Add/remove usage of some id. Used to remove unused ACLs. + void addUsage(uint64_t acl_id); + void removeUsage(uint64_t acl_id); +}; + +} diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index ba1664b23da..6ec9b17d0a7 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -296,7 +296,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin break; } else if (changelog_description.from_log_index > start_to_read_from) - LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smalled available log index on disk {}.", start_to_read_from, changelog_description.from_log_index); + LOG_WARNING(log, "Don't have required amount of reserved log records. Need to read from {}, smallest available log index on disk {}.", start_to_read_from, changelog_description.from_log_index); } started = true; diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 34beef36ff9..b42d50d198e 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -56,14 +56,7 @@ namespace writeBinary(node.data, out); /// Serialize ACL - writeBinary(node.acls.size(), out); - for (const auto & acl : node.acls) - { - writeBinary(acl.permissions, out); - writeBinary(acl.scheme, out); - writeBinary(acl.id, out); - } - + writeBinary(node.acl_id, out); writeBinary(node.is_sequental, out); /// Serialize stat writeBinary(node.stat.czxid, out); @@ -81,21 +74,33 @@ namespace writeBinary(node.seq_num, out); } - void readNode(KeeperStorage::Node & node, ReadBuffer & in) + void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) { readBinary(node.data, in); - /// Deserialize ACL - size_t acls_size; - readBinary(acls_size, in); - for (size_t i = 0; i < acls_size; ++i) + if (version >= SnapshotVersion::V1) { - Coordination::ACL acl; - readBinary(acl.permissions, in); - readBinary(acl.scheme, in); - readBinary(acl.id, in); - node.acls.push_back(acl); + readBinary(node.acl_id, in); } + else if (version == SnapshotVersion::V0) + { + /// Deserialize ACL + size_t acls_size; + readBinary(acls_size, in); + Coordination::ACLs acls; + for (size_t i = 0; i < acls_size; ++i) + { + Coordination::ACL acl; + readBinary(acl.permissions, in); + readBinary(acl.scheme, in); + readBinary(acl.id, in); + acls.push_back(acl); + } + node.acl_id = acl_map.convertACLs(acls); + } + + acl_map.addUsage(node.acl_id); + readBinary(node.is_sequental, in); /// Deserialize stat @@ -137,6 +142,22 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr writeBinary(static_cast(snapshot.version), out); serializeSnapshotMetadata(snapshot.snapshot_meta, out); writeBinary(snapshot.session_id, out); + + /// Serialize ACLs MAP + writeBinary(snapshot.acl_map.size(), out); + for (const auto & [acl_id, acls] : snapshot.acl_map) + { + writeBinary(acl_id, out); + writeBinary(acls.size(), out); + for (const auto & acl : acls) + { + writeBinary(acl.permissions, out); + writeBinary(acl.scheme, out); + writeBinary(acl.id, out); + } + } + + /// Serialize data tree writeBinary(snapshot.snapshot_container_size, out); size_t counter = 0; for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++counter) @@ -157,12 +178,24 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr ++it; } + /// Serialize sessions size_t size = snapshot.session_and_timeout.size(); writeBinary(size, out); for (const auto & [session_id, timeout] : snapshot.session_and_timeout) { writeBinary(session_id, out); writeBinary(timeout, out); + + KeeperStorage::AuthIDs ids; + if (snapshot.session_and_auth.count(session_id)) + ids = snapshot.session_and_auth.at(session_id); + + writeBinary(ids.size(), out); + for (const auto & [scheme, id] : ids) + { + writeBinary(scheme, out); + writeBinary(id, out); + } } } @@ -170,15 +203,42 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, { uint8_t version; readBinary(version, in); - if (static_cast(version) > SnapshotVersion::V0) + SnapshotVersion current_version = static_cast(version); + if (current_version > SnapshotVersion::V1) throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version); SnapshotMetadataPtr result = deserializeSnapshotMetadata(in); int64_t session_id; readBinary(session_id, in); + storage.zxid = result->get_last_log_idx(); storage.session_id_counter = session_id; + if (current_version >= SnapshotVersion::V1) + { + size_t acls_map_size; + readBinary(acls_map_size, in); + size_t current_map_size = 0; + while (current_map_size < acls_map_size) + { + uint64_t acl_id; + readBinary(acl_id, in); + size_t acls_size; + readBinary(acls_size, in); + Coordination::ACLs acls; + for (size_t i = 0; i < acls_size; ++i) + { + Coordination::ACL acl; + readBinary(acl.permissions, in); + readBinary(acl.scheme, in); + readBinary(acl.id, in); + acls.push_back(acl); + } + storage.acl_map.addMapping(acl_id, acls); + current_map_size++; + } + } + size_t snapshot_container_size; readBinary(snapshot_container_size, in); @@ -187,8 +247,8 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, { std::string path; readBinary(path, in); - KeeperStorage::Node node; - readNode(node, in); + KeeperStorage::Node node{}; + readNode(node, in, current_version, storage.acl_map); storage.container.insertOrReplace(path, node); if (node.stat.ephemeralOwner != 0) storage.ephemerals[node.stat.ephemeralOwner].insert(path); @@ -215,6 +275,24 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, readBinary(active_session_id, in); readBinary(timeout, in); storage.addSessionID(active_session_id, timeout); + + if (current_version >= SnapshotVersion::V1) + { + size_t session_auths_size; + readBinary(session_auths_size, in); + + KeeperStorage::AuthIDs ids; + size_t session_auth_counter = 0; + while (session_auth_counter < session_auths_size) + { + String scheme, id; + readBinary(scheme, in); + readBinary(id, in); + ids.emplace_back(KeeperStorage::AuthID{scheme, id}); + } + if (ids.size() > 0) + storage.session_and_auth[active_session_id] = ids; + } current_session_size++; } @@ -230,6 +308,8 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t snapshot_container_size = storage->container.snapshotSize(); begin = storage->getSnapshotIteratorBegin(); session_and_timeout = storage->getActiveSessions(); + acl_map = storage->acl_map.getMapping(); + session_and_auth = storage->session_and_auth; } KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_) @@ -241,6 +321,8 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const Sna snapshot_container_size = storage->container.snapshotSize(); begin = storage->getSnapshotIteratorBegin(); session_and_timeout = storage->getActiveSessions(); + acl_map = storage->acl_map.getMapping(); + session_and_auth = storage->session_and_auth; } KeeperStorageSnapshot::~KeeperStorageSnapshot() diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index adc369fd498..3dbd7c9328e 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -13,6 +13,7 @@ using SnapshotMetadataPtr = std::shared_ptr; enum SnapshotVersion : uint8_t { V0 = 0, + V1 = 1, /// with ACL map }; struct KeeperStorageSnapshot @@ -29,12 +30,14 @@ public: KeeperStorage * storage; - SnapshotVersion version = SnapshotVersion::V0; + SnapshotVersion version = SnapshotVersion::V1; SnapshotMetadataPtr snapshot_meta; int64_t session_id; size_t snapshot_container_size; KeeperStorage::Container::const_iterator begin; SessionAndTimeout session_and_timeout; + KeeperStorage::SessionAndAuth session_and_auth; + std::unordered_map acl_map; }; using KeeperStorageSnapshotPtr = std::shared_ptr; diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index a569072c953..3ae29edb77a 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -226,7 +226,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; @@ -267,12 +267,17 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest KeeperStorage::Node created_node; - if (!fixupACL(request.acls, session_auth_ids, created_node.acls)) + Coordination::ACLs node_acls; + if (!fixupACL(request.acls, session_auth_ids, node_acls)) { response.error = Coordination::Error::ZINVALIDACL; return {response_ptr, {}}; } + uint64_t acl_id = storage.acl_map.convertACLs(node_acls); + storage.acl_map.addUsage(acl_id); + + created_node.acl_id = acl_id; created_node.stat.czxid = zxid; created_node.stat.mzxid = zxid; created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); @@ -312,13 +317,15 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest if (request.is_ephemeral) ephemerals[session_id].emplace(path_created); - undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path] + undo = [&storage, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id] { - container.erase(path_created); - if (is_ephemeral) - ephemerals[session_id].erase(path_created); + storage.container.erase(path_created); + storage.acl_map.removeUsage(acl_id); - container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent) + if (is_ephemeral) + storage.ephemerals[session_id].erase(path_created); + + storage.container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent) { --undo_parent.stat.cversion; --undo_parent.stat.numChildren; @@ -345,7 +352,7 @@ struct KeeperStorageGetRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; @@ -386,7 +393,7 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; @@ -429,6 +436,8 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest ephemerals.erase(ephemerals_it); } + storage.acl_map.removeUsage(prev_node.acl_id); + auto child_basename = getBaseName(it->key); container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent) { @@ -441,13 +450,15 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest container.erase(request.path); - undo = [prev_node, &container, &ephemerals, path = request.path, child_basename] + undo = [prev_node, &storage, path = request.path, child_basename] { if (prev_node.stat.ephemeralOwner != 0) - ephemerals[prev_node.stat.ephemeralOwner].emplace(path); + storage.ephemerals[prev_node.stat.ephemeralOwner].emplace(path); - container.insert(path, prev_node); - container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent) + storage.acl_map.addUsage(prev_node.acl_id); + + storage.container.insert(path, prev_node); + storage.container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent) { ++parent.stat.numChildren; --parent.stat.cversion; @@ -500,7 +511,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; @@ -577,7 +588,7 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; @@ -622,7 +633,7 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest if (it == container.end()) return true; - const auto & node_acls = it->value.acls; + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); if (node_acls.empty()) return true; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 7990574c8f1..7c90a9bd661 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -30,7 +31,7 @@ public: struct Node { String data; - Coordination::ACLs acls{}; + uint64_t acl_id = 0; /// 0 -- no ACL by default bool is_sequental = false; Coordination::Stat stat{}; int32_t seq_num = 0; @@ -81,6 +82,7 @@ public: SessionAndWatcher sessions_and_watchers; SessionExpiryQueue session_expiry_queue; SessionAndTimeout session_and_timeout; + ACLMap acl_map; int64_t zxid{0}; bool finalized{false}; diff --git a/tests/integration/test_keeper_auth/configs/keeper_config.xml b/tests/integration/test_keeper_auth/configs/keeper_config.xml index 92b9b870360..bb3c9a5d94a 100644 --- a/tests/integration/test_keeper_auth/configs/keeper_config.xml +++ b/tests/integration/test_keeper_auth/configs/keeper_config.xml @@ -10,7 +10,7 @@ 5000 10000 trace - false + 75 diff --git a/tests/integration/test_keeper_auth/test.py b/tests/integration/test_keeper_auth/test.py index 0b35650256e..5f60d5b8bdb 100644 --- a/tests/integration/test_keeper_auth/test.py +++ b/tests/integration/test_keeper_auth/test.py @@ -6,7 +6,7 @@ from kazoo.security import ACL, make_digest_acl, make_acl from kazoo.exceptions import AuthFailedError, InvalidACLError, NoAuthError, KazooException cluster = ClickHouseCluster(__file__) -node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], with_zookeeper=True, use_keeper=False) +node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], with_zookeeper=True, use_keeper=False, stay_alive=True) SUPERAUTH = "super:admin" @@ -246,3 +246,57 @@ def test_bad_auth(started_cluster): with pytest.raises(InvalidACLError): print("Sending 12") auth_connection.create("/test_bad_acl", b"data", acl=[make_acl("digest", "dsad:DSAa:d", read=True, write=False, create=True, delete=True, admin=True)]) + +def test_auth_snapshot(started_cluster): + connection = get_fake_zk() + connection.add_auth('digest', 'user1:password1') + + connection.create("/test_snapshot_acl", b"data", acl=[make_acl("auth", "", all=True)]) + + connection1 = get_fake_zk() + connection1.add_auth('digest', 'user2:password2') + + connection1.create("/test_snapshot_acl1", b"data", acl=[make_acl("auth", "", all=True)]) + + connection2 = get_fake_zk() + + connection2.create("/test_snapshot_acl2", b"data") + + for i in range(100): + connection.create(f"/test_snapshot_acl/path{i}", b"data", acl=[make_acl("auth", "", all=True)]) + + node.restart_clickhouse() + + connection = get_fake_zk() + + with pytest.raises(NoAuthError): + connection.get("/test_snapshot_acl") + + connection.add_auth('digest', 'user1:password1') + + assert connection.get("/test_snapshot_acl")[0] == b"data" + + with pytest.raises(NoAuthError): + connection.get("/test_snapshot_acl1") + + assert connection.get("/test_snapshot_acl2")[0] == b"data" + + for i in range(100): + assert connection.get(f"/test_snapshot_acl/path{i}")[0] == b"data" + + connection1 = get_fake_zk() + connection1.add_auth('digest', 'user2:password2') + + assert connection1.get("/test_snapshot_acl1")[0] == b"data" + + with pytest.raises(NoAuthError): + connection1.get("/test_snapshot_acl") + + + connection2 = get_fake_zk() + assert connection2.get("/test_snapshot_acl2")[0] == b"data" + with pytest.raises(NoAuthError): + connection2.get("/test_snapshot_acl") + + with pytest.raises(NoAuthError): + connection2.get("/test_snapshot_acl1") diff --git a/tests/integration/test_keeper_internal_secure/test.py b/tests/integration/test_keeper_internal_secure/test.py index d9fbca624e1..b4bf62f9a37 100644 --- a/tests/integration/test_keeper_internal_secure/test.py +++ b/tests/integration/test_keeper_internal_secure/test.py @@ -26,13 +26,6 @@ def started_cluster(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_multinode_blocade_leader/test.py b/tests/integration/test_keeper_multinode_blocade_leader/test.py index e74ccc80fe8..9ae81ccdcc6 100644 --- a/tests/integration/test_keeper_multinode_blocade_leader/test.py +++ b/tests/integration/test_keeper_multinode_blocade_leader/test.py @@ -66,12 +66,6 @@ def wait_nodes(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_multinode_simple/test.py b/tests/integration/test_keeper_multinode_simple/test.py index 18420c9910d..9e57567d8b1 100644 --- a/tests/integration/test_keeper_multinode_simple/test.py +++ b/tests/integration/test_keeper_multinode_simple/test.py @@ -54,13 +54,6 @@ def wait_nodes(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_persistent_log/test.py b/tests/integration/test_keeper_persistent_log/test.py index 36ee20ae9e7..b0cd9155afb 100644 --- a/tests/integration/test_keeper_persistent_log/test.py +++ b/tests/integration/test_keeper_persistent_log/test.py @@ -33,13 +33,6 @@ def started_cluster(): def get_connection_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_persistent_log_multinode/test.py b/tests/integration/test_keeper_persistent_log_multinode/test.py index 052f38b0bff..306139369fb 100644 --- a/tests/integration/test_keeper_persistent_log_multinode/test.py +++ b/tests/integration/test_keeper_persistent_log_multinode/test.py @@ -25,13 +25,6 @@ def started_cluster(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_restore_from_snapshot/test.py b/tests/integration/test_keeper_restore_from_snapshot/test.py index eee49f8a319..5f6156800bb 100644 --- a/tests/integration/test_keeper_restore_from_snapshot/test.py +++ b/tests/integration/test_keeper_restore_from_snapshot/test.py @@ -25,13 +25,6 @@ def started_cluster(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance @@ -77,6 +70,7 @@ def test_recover_from_snapshot(started_cluster): # stale node should recover from leader's snapshot # with some sanitizers can start longer than 5 seconds node3.start_clickhouse(20) + print("Restarted") try: node1_zk = node2_zk = node3_zk = None diff --git a/tests/integration/test_keeper_snapshots/test.py b/tests/integration/test_keeper_snapshots/test.py index 9879c0003be..7d5b69bf5d1 100644 --- a/tests/integration/test_keeper_snapshots/test.py +++ b/tests/integration/test_keeper_snapshots/test.py @@ -35,13 +35,6 @@ def started_cluster(): def get_connection_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance diff --git a/tests/integration/test_keeper_snapshots_multinode/test.py b/tests/integration/test_keeper_snapshots_multinode/test.py index b4e82d62f09..96d19592d29 100644 --- a/tests/integration/test_keeper_snapshots_multinode/test.py +++ b/tests/integration/test_keeper_snapshots_multinode/test.py @@ -25,13 +25,6 @@ def started_cluster(): def get_fake_zk(nodename, timeout=30.0): _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) - def reset_listener(state): - nonlocal _fake_zk_instance - print("Fake zk callback called for state", state) - if state != KazooState.CONNECTED: - _fake_zk_instance._reset() - - _fake_zk_instance.add_listener(reset_listener) _fake_zk_instance.start() return _fake_zk_instance