This commit is contained in:
Antonio Andelic 2023-09-05 10:49:19 +00:00
parent 57943798b7
commit dd1bb579df
3 changed files with 116 additions and 131 deletions

View File

@ -509,7 +509,6 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -756,7 +755,6 @@ std::vector<int64_t> KeeperStateMachine::getDeadSessions()
int64_t KeeperStateMachine::getNextZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
return storage->getNextZXID();
}

View File

@ -317,7 +317,7 @@ Overloaded(Ts...) -> Overloaded<Ts...>;
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::tryGetNodeFromStorage(StringRef path) const
{
std::lock_guard lock(storage.container_mutex);
std::lock_guard lock(storage.storage_mutex);
if (auto node_it = storage.container.find(path); node_it != storage.container.end())
{
const auto & committed_node = node_it->value;
@ -455,7 +455,7 @@ void KeeperStorage::UncommittedState::applyDeltas(const std::list<Delta> & new_d
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&delta.operation))
{
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(auth_delta->auth_id);
uncommitted_auth.push_back(std::pair{delta.zxid, auth_delta->auth_id});
}
}
}
@ -466,62 +466,30 @@ void KeeperStorage::UncommittedState::addDeltas(std::list<Delta> new_deltas)
deltas.splice(deltas.end(), std::move(new_deltas));
}
//void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
//{
// assert(deltas.empty() || deltas.front().zxid >= commit_zxid);
//
// // collect nodes that have no further modification in the current transaction
// std::unordered_set<std::string> modified_nodes;
//
// while (!deltas.empty() && deltas.front().zxid == commit_zxid)
// {
// if (std::holds_alternative<SubDeltaEnd>(deltas.front().operation))
// {
// deltas.pop_front();
// break;
// }
//
// auto & front_delta = deltas.front();
//
// if (!front_delta.path.empty())
// {
// auto & path_deltas = deltas_for_path.at(front_delta.path);
// assert(path_deltas.front() == &front_delta);
// path_deltas.pop_front();
// if (path_deltas.empty())
// {
// deltas_for_path.erase(front_delta.path);
//
// // no more deltas for path -> no modification
// modified_nodes.insert(std::move(front_delta.path));
// }
// else if (path_deltas.front()->zxid > commit_zxid)
// {
// // next delta has a zxid from a different transaction -> no modification in this transaction
// modified_nodes.insert(std::move(front_delta.path));
// }
// }
// else if (auto * add_auth = std::get_if<AddAuthDelta>(&front_delta.operation))
// {
// auto & uncommitted_auth = session_and_auth[add_auth->session_id];
// assert(!uncommitted_auth.empty() && uncommitted_auth.front() == &add_auth->auth_id);
// uncommitted_auth.pop_front();
// if (uncommitted_auth.empty())
// session_and_auth.erase(add_auth->session_id);
//
// }
//
// deltas.pop_front();
// }
//
// // delete all cached nodes that were not modified after the commit_zxid
// // we only need to check the nodes that were modified in this transaction
// for (const auto & node : modified_nodes)
// {
// if (nodes[node].zxid == commit_zxid)
// nodes.erase(node);
// }
//}
void KeeperStorage::UncommittedState::cleanup(int64_t commit_zxid)
{
for (auto it = nodes.begin(); it != nodes.end();)
{
auto & applied_zxids = it->second.applied_zxids;
std::erase_if(applied_zxids, [commit_zxid](auto current_zxid) { return current_zxid <= commit_zxid; });
if (applied_zxids.empty())
it = nodes.erase(it);
else
++it;
}
for (auto it = session_and_auth.begin(); it != session_and_auth.end();)
{
auto & auths = it->second;
std::erase_if(auths, [commit_zxid](auto auth_pair) { return auth_pair.first <= commit_zxid; });
if (auths.empty())
it = session_and_auth.erase(it);
else
++it;
}
}
void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
{
@ -575,7 +543,7 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta.operation))
{
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
if (uncommitted_auth.back() == add_auth->auth_id)
if (uncommitted_auth.back().second == add_auth->auth_id)
{
uncommitted_auth.pop_back();
if (uncommitted_auth.empty())
@ -634,22 +602,22 @@ void KeeperStorage::UncommittedState::forEachAuthInSession(int64_t session_id, s
if constexpr (std::same_as<TAuth, AuthID>)
auth_ptr = &auth;
else
auth_ptr = auth.get();
auth_ptr = auth.second.get();
func(*auth_ptr);
}
};
{
std::lock_guard lock(storage.auth_mutex);
std::lock_guard lock(storage.storage_mutex);
// for committed
if (storage.session_and_auth.contains(session_id))
call_for_each_auth(storage.session_and_auth.at(session_id));
if (auto auth_it = storage.session_and_auth.find(session_id); auth_it != storage.session_and_auth.end())
call_for_each_auth(auth_it->second);
}
// for uncommitted
if (session_and_auth.contains(session_id))
call_for_each_auth(session_and_auth.at(session_id));
if (auto auth_it = session_and_auth.find(session_id); auth_it != session_and_auth.end())
call_for_each_auth(auth_it->second);
}
namespace
@ -686,29 +654,29 @@ int64_t KeeperStorage::getNextZXID() const
return getNextZXIDLocked(lock);
}
void KeeperStorage::applyUncommittedState(KeeperStorage & /*other*/, int64_t /*last_zxid*/)
void KeeperStorage::applyUncommittedState(KeeperStorage & other, int64_t last_zxid) TSA_NO_THREAD_SAFETY_ANALYSIS
{
//for (const auto & transaction : uncommitted_transactions)
//{
// if (transaction.zxid <= last_zxid)
// continue;
// other.uncommitted_transactions.push_back(transaction);
//}
for (const auto & transaction : uncommitted_transactions)
{
if (transaction.zxid <= last_zxid)
continue;
other.uncommitted_transactions.push_back(transaction);
}
//auto it = uncommitted_state.deltas.begin();
auto it = uncommitted_state.deltas.begin();
//for (; it != uncommitted_state.deltas.end(); ++it)
//{
// if (it->zxid <= last_zxid)
// continue;
for (; it != uncommitted_state.deltas.end(); ++it)
{
if (it->zxid <= last_zxid)
continue;
// other.uncommitted_state.addDelta(*it);
//}
other.uncommitted_state.applyDelta(*it);
other.uncommitted_state.deltas.push_back(*it);
}
}
Coordination::Error KeeperStorage::commit(std::list<Delta> deltas)
{
std::lock_guard lock(container_mutex);
// Deltas are added with increasing ZXIDs
// If there are no deltas for the commit_zxid (e.g. read requests), we instantly return
// on first delta
@ -787,7 +755,6 @@ Coordination::Error KeeperStorage::commit(std::list<Delta> deltas)
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::AddAuthDelta>)
{
std::lock_guard auth_lock(auth_mutex);
session_and_auth[operation.session_id].emplace_back(std::move(*operation.auth_id));
return Coordination::Error::ZOK;
}
@ -939,7 +906,6 @@ Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_
{
if (is_local)
{
std::lock_guard lock(storage.container_mutex);
auto node_it = storage.container.find(path);
if (node_it == storage.container.end())
return {};
@ -2286,30 +2252,33 @@ void KeeperStorage::preprocessRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
//auto session_ephemerals = ephemerals.find(session_id);
//if (session_ephemerals != ephemerals.end())
//{
// for (const auto & ephemeral_path : session_ephemerals->second)
// {
// new_deltas.emplace_back
// (
// parentNodePath(ephemeral_path).toString(),
// new_last_zxid,
// UpdateNodeDelta
// {
// [ephemeral_path](Node & parent)
// {
// ++parent.stat.cversion;
// --parent.stat.numChildren;
// }
// }
// );
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
for (const auto & ephemeral_path : session_ephemerals->second)
{
auto parent_node_path = parentNodePath(ephemeral_path);
auto parent_node = uncommitted_state.getNode(parent_node_path);
UpdateNodeStatDelta parent_update_delta(*parent_node);
++parent_update_delta.new_stats.cversion;
--parent_update_delta.new_stats.numChildren;
// new_deltas.emplace_back(ephemeral_path, transaction->zxid, RemoveNodeDelta{.ephemeral_owner = session_id});
// }
new_deltas.emplace_back
(
parent_node_path.toString(),
new_last_zxid,
std::move(parent_update_delta)
);
// ephemerals.erase(session_ephemerals);
//}
auto node = uncommitted_state.getNode(ephemeral_path);
new_deltas.emplace_back(
ephemeral_path,
transaction->zxid,
RemoveNodeDelta{.stat = node->stat, .acls = uncommitted_state.getACLs(ephemeral_path), .data = node->getData()});
}
ephemerals.erase(session_ephemerals);
}
new_digest = calculateNodesDigest(new_digest, new_deltas);
return;
@ -2322,6 +2291,8 @@ void KeeperStorage::preprocessRequest(
}
new_deltas = request_processor->preprocess(*this, transaction->zxid, session_id, time, new_digest, *keeper_context);
uncommitted_state.cleanup(getZXID());
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
@ -2349,18 +2320,14 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
*new_last_zxid,
uncommitted_transactions.front().zxid);
zxid = *new_last_zxid;
uncommitted_transactions.pop_front();
commit_zxid = *new_last_zxid;
}
else
{
commit_zxid = zxid;
}
commit_zxid = zxid;
}
KeeperStorage::ResponsesForSessions results;
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
std::list<Delta> deltas;
{
std::lock_guard lock(uncommitted_state.deltas_mutex);
@ -2371,6 +2338,11 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
deltas.splice(deltas.end(), uncommitted_state.deltas, uncommitted_state.deltas.begin(), it);
}
KeeperStorage::ResponsesForSessions results;
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
for (const auto & delta : deltas)
@ -2382,16 +2354,16 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
}
}
commit(std::move(deltas));
clearDeadWatches(session_id);
{
std::lock_guard lock(auth_mutex);
std::lock_guard lock(storage_mutex);
commit(std::move(deltas));
auto auth_it = session_and_auth.find(session_id);
if (auth_it != session_and_auth.end())
session_and_auth.erase(auth_it);
}
clearDeadWatches(session_id);
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
@ -2403,7 +2375,11 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
auto response = storage_request->process(*this, std::move(deltas));
Coordination::ZooKeeperResponsePtr response = nullptr;
{
std::lock_guard lock(storage_mutex);
response = storage_request->process(*this, std::move(deltas));
}
response->xid = zk_request->xid;
response->zxid = commit_zxid;
@ -2416,6 +2392,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
if (is_local)
{
std::lock_guard lock(storage_mutex);
assert(zk_request->isReadRequest());
if (check_acl && !request_processor->checkAuth(*this, session_id, true))
{
@ -2430,6 +2407,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
}
else
{
std::lock_guard lock(storage_mutex);
response = request_processor->process(*this, std::move(deltas));
}
@ -2468,11 +2446,19 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
results.push_back(ResponseForSession{session_id, response});
}
{
std::lock_guard lock(transaction_mutex);
if (new_last_zxid)
uncommitted_transactions.pop_front();
zxid = commit_zxid;
}
return results;
}
void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
{
std::unique_lock transaction_lock(transaction_mutex);
if (allow_missing && (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid < rollback_zxid))
return;
@ -2488,6 +2474,7 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
try
{
uncommitted_transactions.pop_back();
transaction_lock.unlock();
uncommitted_state.rollback(rollback_zxid);
}
catch (...)

View File

@ -9,6 +9,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/SharedMutex.h>
#include <Coordination/KeeperContext.h>
#include <base/defines.h>
@ -138,15 +139,17 @@ public:
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
mutable SharedMutex main_mutex;
mutable std::mutex storage_mutex;
int64_t session_id_counter{1};
mutable std::mutex auth_mutex;
SessionAndAuth session_and_auth TSA_GUARDED_BY(auth_mutex);
SessionAndAuth session_and_auth;
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
mutable std::mutex container_mutex;
Container container;
// Applying ZooKeeper request to storage consists of two steps:
@ -250,7 +253,7 @@ public:
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDeltas(std::list<Delta> new_deltas);
void commit(int64_t commit_zxid);
void cleanup(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
std::shared_ptr<Node> getNode(StringRef path) const;
@ -272,7 +275,7 @@ public:
if constexpr (std::same_as<TAuth, AuthID>)
auth_ptr = &auth;
else
auth_ptr = auth.get();
auth_ptr = auth.second.get();
if (predicate(*auth_ptr))
return true;
@ -281,10 +284,7 @@ public:
};
if (is_local)
{
std::lock_guard lock(storage.auth_mutex);
return check_auth(storage.session_and_auth[session_id]);
}
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
@ -294,7 +294,7 @@ public:
if (check_auth(auth_it->second))
return true;
std::lock_guard lock(storage.auth_mutex);
std::lock_guard lock(storage.storage_mutex);
return check_auth(storage.session_and_auth[session_id]);
}
@ -302,7 +302,7 @@ public:
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<std::shared_ptr<AuthID>>> session_and_auth;
std::unordered_map<int64_t, std::list<std::pair<int64_t, std::shared_ptr<AuthID>>>> session_and_auth;
struct UncommittedNode
{
@ -402,7 +402,7 @@ public:
uint64_t nodes_digest{0};
bool finalized{false};
std::atomic<bool> finalized{false};
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;