mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Fix
This commit is contained in:
parent
79fc8d67ad
commit
9633563fbd
@ -441,7 +441,7 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
|
||||
node.getChildren().reserve(node.stats.numChildren());
|
||||
|
||||
if (ephemeral_owner != 0)
|
||||
storage.ephemerals[node.stats.ephemeralOwner()].insert(std::string{path});
|
||||
storage.committed_ephemerals[node.stats.ephemeralOwner()].insert(std::string{path});
|
||||
|
||||
if (recalculate_digest)
|
||||
storage.nodes_digest += node.getDigest(path);
|
||||
@ -536,6 +536,8 @@ void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<S
|
||||
buffer->pos(0);
|
||||
deserialization_result.cluster_config = ClusterConfig::deserialize(*buffer);
|
||||
}
|
||||
|
||||
storage.updateStats();
|
||||
}
|
||||
|
||||
template<typename Storage>
|
||||
|
@ -182,11 +182,11 @@ void assertDigest(
|
||||
}
|
||||
|
||||
template <bool shared = false>
|
||||
struct TSA_SCOPED_LOCKABLE LockGuardWithStats final
|
||||
struct LockGuardWithStats final
|
||||
{
|
||||
using LockType = std::conditional_t<shared, std::shared_lock<SharedMutex>, std::unique_lock<SharedMutex>>;
|
||||
LockType lock;
|
||||
explicit LockGuardWithStats(SharedMutex & mutex) TSA_ACQUIRE(mutex)
|
||||
explicit LockGuardWithStats(SharedMutex & mutex)
|
||||
{
|
||||
Stopwatch watch;
|
||||
LockType l(mutex);
|
||||
@ -194,7 +194,7 @@ struct TSA_SCOPED_LOCKABLE LockGuardWithStats final
|
||||
lock = std::move(l);
|
||||
}
|
||||
|
||||
~LockGuardWithStats() TSA_RELEASE() = default;
|
||||
~LockGuardWithStats() = default;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ public:
|
||||
// This should be used only for tests or keeper-data-dumper because it violates
|
||||
// TSA -- we can't acquire the lock outside of this class or return a storage under lock
|
||||
// in a reasonable way.
|
||||
Storage & getStorageUnsafe() TSA_NO_THREAD_SAFETY_ANALYSIS
|
||||
Storage & getStorageUnsafe()
|
||||
{
|
||||
return *storage;
|
||||
}
|
||||
|
@ -112,6 +112,22 @@ bool fixupACL(
|
||||
return valid_found;
|
||||
}
|
||||
|
||||
void unregisterEphemeralPath(KeeperStorageBase::Ephemerals & ephemerals, int64_t session_id, const std::string & path, bool throw_if_missing)
|
||||
{
|
||||
auto ephemerals_it = ephemerals.find(session_id);
|
||||
if (ephemerals_it == ephemerals.end())
|
||||
{
|
||||
if (throw_if_missing)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session {} is missing ephemeral path {}", session_id, path);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
ephemerals_it->second.erase(path);
|
||||
if (ephemerals_it->second.empty())
|
||||
ephemerals.erase(ephemerals_it);
|
||||
}
|
||||
|
||||
KeeperStorageBase::ResponsesForSessions processWatchesImpl(
|
||||
const String & path, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches, Coordination::Event event_type)
|
||||
{
|
||||
@ -489,6 +505,7 @@ void KeeperStorage<Container>::initializeSystemNodes()
|
||||
}
|
||||
}
|
||||
|
||||
updateStats();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
@ -612,7 +629,7 @@ bool KeeperStorage<Container>::UncommittedState::hasACL(int64_t session_id, bool
|
||||
|
||||
if (is_local)
|
||||
{
|
||||
std::shared_lock lock(storage.storage_mutex);
|
||||
std::shared_lock lock(storage.auth_mutex);
|
||||
return check_auth(storage.session_and_auth[session_id]);
|
||||
}
|
||||
|
||||
@ -620,6 +637,7 @@ bool KeeperStorage<Container>::UncommittedState::hasACL(int64_t session_id, bool
|
||||
if (closed_sessions.contains(session_id))
|
||||
return false;
|
||||
|
||||
std::shared_lock lock(storage.auth_mutex);
|
||||
if (check_auth(storage.session_and_auth[session_id]))
|
||||
return true;
|
||||
|
||||
@ -631,7 +649,6 @@ bool KeeperStorage<Container>::UncommittedState::hasACL(int64_t session_id, bool
|
||||
if (check_auth(auth_it->second))
|
||||
return true;
|
||||
|
||||
std::lock_guard lock(storage.storage_mutex);
|
||||
return check_auth(storage.session_and_auth[session_id]);
|
||||
}
|
||||
|
||||
@ -777,8 +794,9 @@ void KeeperStorage<Container>::UncommittedState::rollback(std::list<Delta> rollb
|
||||
// we need to undo ephemeral mapping modifications
|
||||
// CreateNodeDelta added ephemeral for session id -> we need to remove it
|
||||
// RemoveNodeDelta removed ephemeral for session id -> we need to add it back
|
||||
for (auto & delta : rollback_deltas)
|
||||
for (auto delta_it = rollback_deltas.rbegin(); delta_it != rollback_deltas.rend(); ++delta_it)
|
||||
{
|
||||
const auto & delta = *delta_it;
|
||||
if (!delta.path.empty())
|
||||
{
|
||||
std::visit(
|
||||
@ -787,14 +805,17 @@ void KeeperStorage<Container>::UncommittedState::rollback(std::list<Delta> rollb
|
||||
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
|
||||
{
|
||||
if (operation.stat.ephemeralOwner != 0)
|
||||
storage.unregisterEphemeralPath(operation.stat.ephemeralOwner, delta.path);
|
||||
{
|
||||
std::lock_guard lock(storage.ephemeral_mutex);
|
||||
unregisterEphemeralPath(storage.uncommitted_state.ephemerals, operation.stat.ephemeralOwner, delta.path, /*throw_if_missing=*/false);
|
||||
}
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
|
||||
{
|
||||
if (operation.stat.ephemeralOwner() != 0)
|
||||
{
|
||||
std::lock_guard lock(storage.ephemerals_mutex);
|
||||
storage.ephemerals[operation.stat.ephemeralOwner()].emplace(delta.path);
|
||||
std::lock_guard lock(storage.ephemeral_mutex);
|
||||
storage.uncommitted_state.ephemerals[operation.stat.ephemeralOwner()].emplace(delta.path);
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -802,7 +823,7 @@ void KeeperStorage<Container>::UncommittedState::rollback(std::list<Delta> rollb
|
||||
|
||||
rollbackDelta(delta);
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta.operation))
|
||||
else if (const auto * add_auth = std::get_if<AddAuthDelta>(&delta.operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
|
||||
if (uncommitted_auth.back().second == add_auth->auth_id)
|
||||
@ -812,7 +833,7 @@ void KeeperStorage<Container>::UncommittedState::rollback(std::list<Delta> rollb
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
}
|
||||
}
|
||||
else if (auto * close_session = std::get_if<CloseSessionDelta>(&delta.operation))
|
||||
else if (const auto * close_session = std::get_if<CloseSessionDelta>(&delta.operation))
|
||||
{
|
||||
closed_sessions.erase(close_session->session_id);
|
||||
}
|
||||
@ -875,16 +896,16 @@ void KeeperStorage<Container>::UncommittedState::forEachAuthInSession(int64_t se
|
||||
else
|
||||
auth_ptr = auth.second.get();
|
||||
|
||||
func(*auth_ptr);
|
||||
if (!auth_ptr->scheme.empty())
|
||||
func(*auth_ptr);
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
std::lock_guard lock(storage.storage_mutex);
|
||||
// for committed
|
||||
if (auto auth_it = storage.session_and_auth.find(session_id); auth_it != storage.session_and_auth.end())
|
||||
call_for_each_auth(auth_it->second);
|
||||
}
|
||||
/// both committed and uncommitted need to be under the lock to avoid fetching the same AuthID from both committed and uncommitted state
|
||||
std::shared_lock lock(storage.auth_mutex);
|
||||
// for committed
|
||||
if (auto auth_it = storage.session_and_auth.find(session_id); auth_it != storage.session_and_auth.end())
|
||||
call_for_each_auth(auth_it->second);
|
||||
|
||||
// for uncommitted
|
||||
if (auto auth_it = session_and_auth.find(session_id); auth_it != session_and_auth.end())
|
||||
@ -1034,6 +1055,7 @@ Coordination::Error KeeperStorage<Container>::commit(std::list<Delta> deltas)
|
||||
}
|
||||
else if constexpr (std::same_as<DeltaType, AddAuthDelta>)
|
||||
{
|
||||
std::lock_guard auth_lock{auth_mutex};
|
||||
session_and_auth[operation.session_id].emplace_back(std::move(*operation.auth_id));
|
||||
return Coordination::Error::ZOK;
|
||||
}
|
||||
@ -1104,6 +1126,15 @@ bool KeeperStorage<Container>::createNode(
|
||||
|
||||
addDigest(map_key->getMapped()->value, map_key->getKey().toView());
|
||||
}
|
||||
|
||||
if (stat.ephemeralOwner != 0)
|
||||
{
|
||||
++committed_ephemeral_nodes;
|
||||
std::lock_guard lock(ephemeral_mutex);
|
||||
committed_ephemerals[stat.ephemeralOwner].emplace(path);
|
||||
unregisterEphemeralPath(uncommitted_state.ephemerals, stat.ephemeralOwner, path, /*throw_if_missing=*/false);
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
@ -1133,7 +1164,6 @@ bool KeeperStorage<Container>::removeNode(const std::string & path, int32_t vers
|
||||
[child_basename = getBaseNodeName(node_it->key)](KeeperMemNode & parent)
|
||||
{
|
||||
parent.removeChild(child_basename);
|
||||
chassert(parent.stats.numChildren() == static_cast<int32_t>(parent.getChildren().size()));
|
||||
}
|
||||
);
|
||||
|
||||
@ -1141,6 +1171,14 @@ bool KeeperStorage<Container>::removeNode(const std::string & path, int32_t vers
|
||||
|
||||
removeDigest(prev_node, path);
|
||||
}
|
||||
|
||||
if (prev_node.stats.ephemeralOwner() != 0)
|
||||
{
|
||||
--committed_ephemeral_nodes;
|
||||
std::lock_guard lock(ephemeral_mutex);
|
||||
unregisterEphemeralPath(committed_ephemerals, prev_node.stats.ephemeralOwner(), path, /*throw_if_missing=*/true);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1248,19 +1286,6 @@ bool KeeperStorage<Container>::checkACL(StringRef path, int32_t permission, int6
|
||||
return false;
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
void KeeperStorage<Container>::unregisterEphemeralPath(int64_t session_id, const std::string & path)
|
||||
{
|
||||
std::lock_guard ephemerals_lock(ephemerals_mutex);
|
||||
auto ephemerals_it = ephemerals.find(session_id);
|
||||
if (ephemerals_it == ephemerals.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session {} is missing ephemeral path", session_id);
|
||||
|
||||
ephemerals_it->second.erase(path);
|
||||
if (ephemerals_it->second.empty())
|
||||
ephemerals.erase(ephemerals_it);
|
||||
}
|
||||
|
||||
/// Default implementations ///
|
||||
template <std::derived_from<Coordination::ZooKeeperRequest> T, typename Storage>
|
||||
Coordination::ZooKeeperResponsePtr
|
||||
@ -1398,8 +1423,8 @@ std::list<KeeperStorageBase::Delta> preprocess(
|
||||
|
||||
if (zk_request.is_ephemeral)
|
||||
{
|
||||
std::lock_guard lock(storage.ephemerals_mutex);
|
||||
storage.ephemerals[session_id].emplace(path_created);
|
||||
std::lock_guard lock(storage.ephemeral_mutex);
|
||||
storage.uncommitted_state.ephemerals[session_id].emplace(path_created);
|
||||
}
|
||||
|
||||
int32_t parent_cversion = zk_request.parent_cversion;
|
||||
@ -1436,7 +1461,6 @@ std::list<KeeperStorageBase::Delta> preprocess(
|
||||
zxid,
|
||||
typename Storage::CreateNodeDelta{stat, std::move(node_acls), zk_request.data});
|
||||
|
||||
|
||||
digest = storage.calculateNodesDigest(digest, new_deltas);
|
||||
return new_deltas;
|
||||
}
|
||||
@ -1606,24 +1630,29 @@ std::list<KeeperStorageBase::Delta> preprocess(
|
||||
auto parent_path = parentNodePath(zk_request.path);
|
||||
auto parent_node = storage.uncommitted_state.getNode(parent_path);
|
||||
|
||||
KeeperStorageBase::UpdateNodeStatDelta update_parent_delta(*parent_node);
|
||||
std::optional<KeeperStorageBase::UpdateNodeStatDelta> update_parent_delta;
|
||||
if (parent_node)
|
||||
update_parent_delta.emplace(*parent_node);
|
||||
|
||||
const auto add_parent_update_delta = [&]
|
||||
{
|
||||
if (!update_parent_delta)
|
||||
return;
|
||||
|
||||
new_deltas.emplace_back(
|
||||
std::string{parent_path},
|
||||
zxid,
|
||||
std::move(update_parent_delta)
|
||||
std::move(*update_parent_delta)
|
||||
);
|
||||
};
|
||||
|
||||
const auto update_parent_pzxid = [&]()
|
||||
{
|
||||
if (!parent_node)
|
||||
if (!update_parent_delta)
|
||||
return;
|
||||
|
||||
if (update_parent_delta.old_stats.pzxid < zxid)
|
||||
update_parent_delta.new_stats.pzxid = zxid;
|
||||
if (update_parent_delta->old_stats.pzxid < zxid)
|
||||
update_parent_delta->new_stats.pzxid = zxid;
|
||||
};
|
||||
|
||||
auto node = storage.uncommitted_state.getNode(zk_request.path);
|
||||
@ -1645,8 +1674,9 @@ std::list<KeeperStorageBase::Delta> preprocess(
|
||||
if (zk_request.restored_from_zookeeper_log)
|
||||
update_parent_pzxid();
|
||||
|
||||
++update_parent_delta.new_stats.cversion;
|
||||
update_parent_delta.new_stats.decreaseNumChildren();
|
||||
chassert(update_parent_delta);
|
||||
++update_parent_delta->new_stats.cversion;
|
||||
update_parent_delta->new_stats.decreaseNumChildren();
|
||||
add_parent_update_delta();
|
||||
|
||||
new_deltas.emplace_back(
|
||||
@ -1656,7 +1686,11 @@ std::list<KeeperStorageBase::Delta> preprocess(
|
||||
zk_request.version, node->stats, storage.uncommitted_state.getACLs(zk_request.path), std::string{node->getData()}});
|
||||
|
||||
if (node->stats.isEphemeral())
|
||||
storage.unregisterEphemeralPath(node->stats.ephemeralOwner(), zk_request.path);
|
||||
{
|
||||
std::lock_guard ephemeral_lock(storage.ephemeral_mutex);
|
||||
/// try deleting the ephemeral node from the uncommitted state
|
||||
unregisterEphemeralPath(storage.uncommitted_state.ephemerals, node->stats.ephemeralOwner(), zk_request.path, /*throw_if_missing=*/false);
|
||||
}
|
||||
|
||||
digest = storage.calculateNodesDigest(digest, new_deltas);
|
||||
|
||||
@ -1676,12 +1710,6 @@ process(const Coordination::ZooKeeperRemoveRequest & /*zk_request*/, Storage & s
|
||||
/// REMOVE Request ///
|
||||
|
||||
/// EXISTS Request ///
|
||||
template <typename Storage>
|
||||
bool checkAuth(const Coordination::ZooKeeperExistsRequest & zk_request, Storage & storage, int64_t session_id, bool is_local)
|
||||
{
|
||||
return storage.checkACL(zk_request.getPath(), Coordination::ACL::Read, session_id, is_local);
|
||||
}
|
||||
|
||||
template <typename Storage>
|
||||
std::list<KeeperStorageBase::Delta> preprocess(
|
||||
const Coordination::ZooKeeperExistsRequest & zk_request,
|
||||
@ -2438,7 +2466,7 @@ void KeeperStorage<Container>::finalize()
|
||||
|
||||
finalized = true;
|
||||
|
||||
ephemerals.clear();
|
||||
committed_ephemerals.clear();
|
||||
|
||||
watches.clear();
|
||||
list_watches.clear();
|
||||
@ -2612,33 +2640,54 @@ void KeeperStorage<Container>::preprocessRequest(
|
||||
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(ephemerals_mutex);
|
||||
auto session_ephemerals = ephemerals.find(session_id);
|
||||
if (session_ephemerals != ephemerals.end())
|
||||
const auto process_ephemerals_for_session = [&](auto & current_ephemerals)
|
||||
{
|
||||
for (const auto & ephemeral_path : session_ephemerals->second)
|
||||
auto session_ephemerals = current_ephemerals.find(session_id);
|
||||
if (session_ephemerals != current_ephemerals.end())
|
||||
{
|
||||
auto parent_node_path = parentNodePath(ephemeral_path);
|
||||
auto parent_node = uncommitted_state.getNode(parent_node_path);
|
||||
UpdateNodeStatDelta parent_update_delta(*parent_node);
|
||||
++parent_update_delta.new_stats.cversion;
|
||||
parent_update_delta.new_stats.decreaseNumChildren();
|
||||
new_deltas.emplace_back
|
||||
(
|
||||
parent_node_path.toString(),
|
||||
new_last_zxid,
|
||||
std::move(parent_update_delta)
|
||||
);
|
||||
std::unordered_map<std::string_view, UpdateNodeStatDelta> parent_updates;
|
||||
for (const auto & ephemeral_path : session_ephemerals->second)
|
||||
{
|
||||
auto node = uncommitted_state.getNode(ephemeral_path);
|
||||
|
||||
auto node = uncommitted_state.getNode(ephemeral_path);
|
||||
new_deltas.emplace_back(
|
||||
ephemeral_path,
|
||||
transaction->zxid,
|
||||
RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}});
|
||||
/// maybe the node is deleted or recreated with different session_id in the uncommitted state
|
||||
if (!node || node->stats.ephemeralOwner() != session_id)
|
||||
continue;
|
||||
|
||||
auto parent_node_path = parentNodePath(ephemeral_path).toView();
|
||||
|
||||
auto parent_update_it = parent_updates.find(parent_node_path);
|
||||
if (parent_update_it == parent_updates.end())
|
||||
{
|
||||
auto parent_node = uncommitted_state.getNode(StringRef{parent_node_path});
|
||||
std::tie(parent_update_it, std::ignore) = parent_updates.emplace(parent_node_path, *parent_node);
|
||||
}
|
||||
|
||||
auto & parent_update_delta = parent_update_it->second;
|
||||
++parent_update_delta.new_stats.cversion;
|
||||
parent_update_delta.new_stats.decreaseNumChildren();
|
||||
|
||||
new_deltas.emplace_back(
|
||||
ephemeral_path,
|
||||
transaction->zxid,
|
||||
RemoveNodeDelta{.stat = node->stats, .acls = uncommitted_state.getACLs(ephemeral_path), .data = std::string{node->getData()}});
|
||||
}
|
||||
|
||||
for (auto & [parent_path, parent_update_delta] : parent_updates)
|
||||
{
|
||||
new_deltas.emplace_back
|
||||
(
|
||||
std::string{parent_path},
|
||||
new_last_zxid,
|
||||
std::move(parent_update_delta)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
ephemerals.erase(session_ephemerals);
|
||||
}
|
||||
};
|
||||
std::lock_guard ephemeral_lock(ephemeral_mutex);
|
||||
process_ephemerals_for_session(committed_ephemerals);
|
||||
process_ephemerals_for_session(uncommitted_state.ephemerals);
|
||||
uncommitted_state.ephemerals.erase(session_id);
|
||||
}
|
||||
|
||||
new_deltas.emplace_back(transaction->zxid, CloseSessionDelta{session_id});
|
||||
@ -2739,6 +2788,9 @@ KeeperStorage<Container>::ResponsesForSessions KeeperStorage<Container>::process
|
||||
{
|
||||
std::lock_guard lock(storage_mutex);
|
||||
commit(std::move(deltas));
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(auth_mutex);
|
||||
auto auth_it = session_and_auth.find(session_id);
|
||||
if (auth_it != session_and_auth.end())
|
||||
session_and_auth.erase(auth_it);
|
||||
@ -3087,8 +3139,8 @@ void KeeperStorage<Container>::dumpSessionsAndEphemerals(WriteBufferFromOwnStrin
|
||||
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
|
||||
}
|
||||
|
||||
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCountLocked() << "):\n";
|
||||
for (const auto & [session_id, ephemeral_paths] : ephemerals)
|
||||
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCount() << "):\n";
|
||||
for (const auto & [session_id, ephemeral_paths] : committed_ephemerals)
|
||||
{
|
||||
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
|
||||
write_str_set(ephemeral_paths);
|
||||
@ -3127,13 +3179,7 @@ uint64_t KeeperStorage<Container>::getTotalWatchesCount() const
|
||||
template<typename Container>
|
||||
uint64_t KeeperStorage<Container>::getSessionWithEphemeralNodesCount() const
|
||||
{
|
||||
return getSessionWithEphemeralNodesCountLocked();
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
uint64_t KeeperStorage<Container>::getSessionWithEphemeralNodesCountLocked() const
|
||||
{
|
||||
return ephemerals.size();
|
||||
return committed_ephemerals.size();
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
@ -3145,11 +3191,7 @@ uint64_t KeeperStorage<Container>::getSessionsWithWatchesCount() const
|
||||
template<typename Container>
|
||||
uint64_t KeeperStorage<Container>::getTotalEphemeralNodesCount() const
|
||||
{
|
||||
uint64_t ret = 0;
|
||||
for (const auto & [session_id, nodes] : ephemerals)
|
||||
ret += nodes.size();
|
||||
|
||||
return ret;
|
||||
return committed_ephemeral_nodes;
|
||||
}
|
||||
|
||||
template<typename Container>
|
||||
|
@ -474,9 +474,10 @@ public:
|
||||
|
||||
int64_t session_id_counter{1};
|
||||
|
||||
mutable SharedMutex auth_mutex;
|
||||
SessionAndAuth session_and_auth;
|
||||
mutable std::shared_mutex storage_mutex;
|
||||
|
||||
mutable SharedMutex storage_mutex;
|
||||
/// Main hashtable with nodes. Contain all information about data.
|
||||
/// All other structures expect session_and_timeout can be restored from
|
||||
/// container.
|
||||
@ -541,6 +542,8 @@ public:
|
||||
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
|
||||
mutable ZxidToNodes zxid_to_nodes;
|
||||
|
||||
Ephemerals ephemerals;
|
||||
|
||||
mutable std::mutex deltas_mutex;
|
||||
std::list<Delta> deltas TSA_GUARDED_BY(deltas_mutex);
|
||||
KeeperStorage<Container> & storage;
|
||||
@ -570,11 +573,10 @@ public:
|
||||
|
||||
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
|
||||
|
||||
void unregisterEphemeralPath(int64_t session_id, const std::string & path);
|
||||
|
||||
mutable std::mutex ephemerals_mutex;
|
||||
std::mutex ephemeral_mutex;
|
||||
/// Mapping session_id -> set of ephemeral nodes paths
|
||||
Ephemerals ephemerals;
|
||||
Ephemerals committed_ephemerals;
|
||||
size_t committed_ephemeral_nodes{0};
|
||||
|
||||
/// Expiration queue for session, allows to get dead sessions at some point of time
|
||||
SessionExpiryQueue session_expiry_queue;
|
||||
@ -687,7 +689,6 @@ public:
|
||||
/// Get all dead sessions
|
||||
std::vector<int64_t> getDeadSessions() const;
|
||||
|
||||
|
||||
void updateStats();
|
||||
const Stats & getStorageStats() const;
|
||||
|
||||
@ -713,8 +714,6 @@ public:
|
||||
|
||||
void recalculateStats();
|
||||
private:
|
||||
uint64_t getSessionWithEphemeralNodesCountLocked() const;
|
||||
|
||||
void removeDigest(const Node & node, std::string_view path);
|
||||
void addDigest(const Node & node, std::string_view path);
|
||||
};
|
||||
|
@ -134,7 +134,7 @@ int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, LoggerPtr log
|
||||
storage.container.insertOrReplace(path, node);
|
||||
|
||||
if (ephemeral_owner != 0)
|
||||
storage.ephemerals[ephemeral_owner].insert(path);
|
||||
storage.committed_ephemerals[ephemeral_owner].insert(path);
|
||||
|
||||
storage.acl_map.addUsage(node.acl_id);
|
||||
}
|
||||
|
@ -1572,8 +1572,8 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
|
||||
addNode(storage, "/hello2", "somedata", 3);
|
||||
storage.session_id_counter = 5;
|
||||
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
|
||||
storage.ephemerals[3] = {"/hello2"};
|
||||
storage.ephemerals[1] = {"/hello1"};
|
||||
storage.committed_ephemerals[3] = {"/hello2"};
|
||||
storage.committed_ephemerals[1] = {"/hello1"};
|
||||
storage.getSessionID(130);
|
||||
storage.getSessionID(130);
|
||||
|
||||
@ -1603,9 +1603,9 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotSimple)
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
|
||||
EXPECT_EQ(restored_storage->session_id_counter, 7);
|
||||
EXPECT_EQ(restored_storage->getZXID(), 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
|
||||
}
|
||||
|
||||
@ -2028,7 +2028,7 @@ TYPED_TEST(CoordinationTest, TestEphemeralNodeRemove)
|
||||
state_machine->commit(1, entry_c->get_buf());
|
||||
const auto & storage = state_machine->getStorageUnsafe();
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 1);
|
||||
EXPECT_EQ(storage.committed_ephemerals.size(), 1);
|
||||
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
|
||||
request_d->path = "/hello";
|
||||
/// Delete from other session
|
||||
@ -2036,7 +2036,7 @@ TYPED_TEST(CoordinationTest, TestEphemeralNodeRemove)
|
||||
state_machine->pre_commit(2, entry_d->get_buf());
|
||||
state_machine->commit(2, entry_d->get_buf());
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 0);
|
||||
EXPECT_EQ(storage.committed_ephemerals.size(), 0);
|
||||
}
|
||||
|
||||
|
||||
@ -2536,8 +2536,8 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
|
||||
addNode(storage, "/hello2", "somedata", 3);
|
||||
storage.session_id_counter = 5;
|
||||
TSA_SUPPRESS_WARNING_FOR_WRITE(storage.zxid) = 2;
|
||||
storage.ephemerals[3] = {"/hello2"};
|
||||
storage.ephemerals[1] = {"/hello1"};
|
||||
storage.committed_ephemerals[3] = {"/hello2"};
|
||||
storage.committed_ephemerals[1] = {"/hello1"};
|
||||
storage.getSessionID(130);
|
||||
storage.getSessionID(130);
|
||||
|
||||
@ -2563,9 +2563,9 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotDifferentCompressions)
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
|
||||
EXPECT_EQ(restored_storage->session_id_counter, 7);
|
||||
EXPECT_EQ(restored_storage->getZXID(), 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage->committed_ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
|
||||
}
|
||||
|
||||
@ -2750,8 +2750,8 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotEqual)
|
||||
|
||||
storage.session_id_counter = 5;
|
||||
|
||||
storage.ephemerals[3] = {"/hello"};
|
||||
storage.ephemerals[1] = {"/hello/somepath"};
|
||||
storage.committed_ephemerals[3] = {"/hello"};
|
||||
storage.committed_ephemerals[1] = {"/hello/somepath"};
|
||||
|
||||
for (size_t j = 0; j < 3333; ++j)
|
||||
storage.getSessionID(130 * j);
|
||||
|
@ -28,13 +28,13 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine<DB::KeeperMemoryStorage>> ma
|
||||
keys.pop();
|
||||
std::cout << key << "\n";
|
||||
auto value = storage.container.getValue(key);
|
||||
std::cout << "\tStat: {version: " << value.version <<
|
||||
", mtime: " << value.mtime <<
|
||||
", emphemeralOwner: " << value.ephemeralOwner() <<
|
||||
", czxid: " << value.czxid <<
|
||||
", mzxid: " << value.mzxid <<
|
||||
", numChildren: " << value.numChildren() <<
|
||||
", dataLength: " << value.data_size <<
|
||||
std::cout << "\tStat: {version: " << value.stats.version <<
|
||||
", mtime: " << value.stats.mtime <<
|
||||
", emphemeralOwner: " << value.stats.ephemeralOwner() <<
|
||||
", czxid: " << value.stats.czxid <<
|
||||
", mzxid: " << value.stats.mzxid <<
|
||||
", numChildren: " << value.stats.numChildren() <<
|
||||
", dataLength: " << value.stats.data_size <<
|
||||
"}" << std::endl;
|
||||
std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user