Merge branch 'keeper-parallel-storage' into keeper-some-improvement2

This commit is contained in:
Antonio Andelic 2024-06-11 10:18:27 +02:00
commit 5ab06caffc
11 changed files with 1086 additions and 763 deletions

View File

@ -45,6 +45,7 @@ uint64_t ACLMap::convertACLs(const Coordination::ACLs & acls)
if (acls.empty())
return 0;
std::lock_guard lock(map_mutex);
if (acl_to_num.contains(acls))
return acl_to_num[acls];
@ -62,6 +63,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
if (acls_id == 0)
return Coordination::ACLs{};
std::lock_guard lock(map_mutex);
if (!num_to_acl.contains(acls_id))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown ACL id {}. It's a bug", acls_id);
@ -70,6 +72,7 @@ Coordination::ACLs ACLMap::convertNumber(uint64_t acls_id) const
void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
{
std::lock_guard lock(map_mutex);
num_to_acl[acls_id] = acls;
acl_to_num[acls] = acls_id;
max_acl_id = std::max(acls_id + 1, max_acl_id); /// max_acl_id pointer next slot
@ -77,11 +80,13 @@ void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls)
void ACLMap::addUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
usage_counter[acl_id]++;
}
void ACLMap::removeUsage(uint64_t acl_id)
{
std::lock_guard lock(map_mutex);
if (!usage_counter.contains(acl_id))
return;

View File

@ -32,6 +32,8 @@ private:
NumToACLMap num_to_acl;
UsageCounter usage_counter;
uint64_t max_acl_id{1};
mutable std::mutex map_mutex;
public:
/// Convert ACL to number. If it's new ACL than adds it to map

View File

@ -275,7 +275,7 @@ void KeeperDispatcher::requestThread()
if (has_read_request)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request);
server->putLocalReadRequest({request});
else
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
}
@ -449,13 +449,18 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid);
request_queue_it != xid_to_request_queue.end())
{
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
Stopwatch watch;
if (server->isLeaderAlive())
server->putLocalReadRequest(request_queue_it->second);
else
addErrorResponses(request_queue_it->second, Coordination::Error::ZCONNECTIONLOSS);
if (request_queue_it->second.size() > 500)
LOG_INFO(
getLogger("Speed"),
"It took {}ms for {} requests",
watch.elapsedMilliseconds(),
request_queue_it->second.size());
xid_to_request_queue.erase(request_queue_it);
}

View File

@ -533,9 +533,10 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
}
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestsForSessions & request_for_session)
{
if (!request_for_session.request->isReadRequest())
if (std::any_of(
request_for_session.begin(), request_for_session.end(), [](const auto & request) { return !request.request->isReadRequest(); }))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
state_machine->processReadRequest(request_for_session);

View File

@ -84,7 +84,7 @@ public:
/// Put local read request and execute in state machine directly and response into
/// responses queue
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
void putLocalReadRequest(const KeeperStorage::RequestsForSessions & request);
bool isRecovering() const { return is_recovering; }
bool reconfigEnabled() const { return enable_reconfiguration; }

View File

@ -71,26 +71,26 @@ namespace
writeBinary(node.getData(), out);
/// Serialize ACL
writeBinary(node.acl_id, out);
writeBinary(node.stats.acl_id, out);
/// Write is_sequential for backwards compatibility
if (version < SnapshotVersion::V6)
writeBinary(false, out);
/// Serialize stat
writeBinary(node.czxid, out);
writeBinary(node.mzxid, out);
writeBinary(node.ctime(), out);
writeBinary(node.mtime, out);
writeBinary(node.version, out);
writeBinary(node.cversion, out);
writeBinary(node.aversion, out);
writeBinary(node.ephemeralOwner(), out);
writeBinary(node.stats.czxid, out);
writeBinary(node.stats.mzxid, out);
writeBinary(node.stats.ctime(), out);
writeBinary(node.stats.mtime, out);
writeBinary(node.stats.version, out);
writeBinary(node.stats.cversion, out);
writeBinary(node.stats.aversion, out);
writeBinary(node.stats.ephemeralOwner(), out);
if (version < SnapshotVersion::V6)
writeBinary(static_cast<int32_t>(node.data_size), out);
writeBinary(node.numChildren(), out);
writeBinary(node.pzxid, out);
writeBinary(static_cast<int32_t>(node.stats.data_size), out);
writeBinary(node.stats.numChildren(), out);
writeBinary(node.stats.pzxid, out);
writeBinary(node.seqNum(), out);
writeBinary(node.stats.seqNum(), out);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
writeBinary(node.sizeInBytes(), out);
@ -98,16 +98,16 @@ namespace
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{
readVarUInt(node.data_size, in);
if (node.data_size != 0)
readVarUInt(node.stats.data_size, in);
if (node.stats.data_size != 0)
{
node.data = std::unique_ptr<char[]>(new char[node.data_size]);
in.readStrict(node.data.get(), node.data_size);
node.data = std::unique_ptr<char[]>(new char[node.stats.data_size]);
in.readStrict(node.data.get(), node.stats.data_size);
}
if (version >= SnapshotVersion::V1)
{
readBinary(node.acl_id, in);
readBinary(node.stats.acl_id, in);
}
else if (version == SnapshotVersion::V0)
{
@ -123,14 +123,14 @@ namespace
readBinary(acl.id, in);
acls.push_back(acl);
}
node.acl_id = acl_map.convertACLs(acls);
node.stats.acl_id = acl_map.convertACLs(acls);
}
/// Some strange ACLID during deserialization from ZooKeeper
if (node.acl_id == std::numeric_limits<uint64_t>::max())
node.acl_id = 0;
if (node.stats.acl_id == std::numeric_limits<uint64_t>::max())
node.stats.acl_id = 0;
acl_map.addUsage(node.acl_id);
acl_map.addUsage(node.stats.acl_id);
if (version < SnapshotVersion::V6)
{
@ -139,19 +139,19 @@ namespace
}
/// Deserialize stat
readBinary(node.czxid, in);
readBinary(node.mzxid, in);
readBinary(node.stats.czxid, in);
readBinary(node.stats.mzxid, in);
int64_t ctime;
readBinary(ctime, in);
node.setCtime(ctime);
readBinary(node.mtime, in);
readBinary(node.version, in);
readBinary(node.cversion, in);
readBinary(node.aversion, in);
node.stats.setCtime(ctime);
readBinary(node.stats.mtime, in);
readBinary(node.stats.version, in);
readBinary(node.stats.cversion, in);
readBinary(node.stats.aversion, in);
int64_t ephemeral_owner = 0;
readBinary(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
node.stats.setEphemeralOwner(ephemeral_owner);
if (version < SnapshotVersion::V6)
{
@ -161,14 +161,14 @@ namespace
int32_t num_children = 0;
readBinary(num_children, in);
if (ephemeral_owner == 0)
node.setNumChildren(num_children);
node.stats.setNumChildren(num_children);
readBinary(node.pzxid, in);
readBinary(node.stats.pzxid, in);
int32_t seq_num = 0;
readBinary(seq_num, in);
if (ephemeral_owner == 0)
node.setSeqNum(seq_num);
node.stats.setSeqNum(seq_num);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
{
@ -253,7 +253,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
/// Benign race condition possible while taking snapshot: NuRaft decide to create snapshot at some log id
/// and only after some time we lock storage and enable snapshot mode. So snapshot_container_size can be
/// slightly bigger than required.
if (node.mzxid > snapshot.zxid)
if (node.stats.mzxid > snapshot.zxid)
break;
writeBinary(path, out);
@ -303,7 +303,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
}
}
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context)
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context) TSA_NO_THREAD_SAFETY_ANALYSIS
{
uint8_t version;
readBinary(version, in);
@ -432,12 +432,12 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
}
}
auto ephemeral_owner = node.ephemeralOwner();
if (!node.isEphemeral() && node.numChildren() > 0)
node.getChildren().reserve(node.numChildren());
auto ephemeral_owner = node.stats.ephemeralOwner();
if (!node.stats.isEphemeral() && node.stats.numChildren() > 0)
node.getChildren().reserve(node.stats.numChildren());
if (ephemeral_owner != 0)
storage.ephemerals[node.ephemeralOwner()].insert(std::string{path});
storage.ephemerals[node.stats.ephemeralOwner()].insert(std::string{path});
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
@ -461,12 +461,12 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{
if (itr.key != "/")
{
if (itr.value.numChildren() != static_cast<int32_t>(itr.value.getChildren().size()))
if (itr.value.stats.numChildren() != static_cast<int32_t>(itr.value.getChildren().size()))
{
#ifdef NDEBUG
/// TODO (alesapin) remove this, it should be always CORRUPTED_DATA.
LOG_ERROR(getLogger("KeeperSnapshotManager"), "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}", itr.value.numChildren(), itr.value.getChildren().size(), itr.key);
" is different from actual children size {} for node {}", itr.value.stats.numChildren(), itr.value.getChildren().size(), itr.key);
#else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",

View File

@ -33,6 +33,11 @@ namespace ProfileEvents
extern const Event KeeperSaveSnapshot;
}
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections;
}
namespace DB
{
@ -59,6 +64,7 @@ KeeperStateMachine::KeeperStateMachine(
, snapshots_queue(snapshots_queue_)
, min_request_size_to_cache(keeper_context_->getCoordinationSettings()->min_request_size_for_cache)
, log(getLogger("KeeperStateMachine"))
, read_pool(CurrentMetrics::KeeperAliveConnections, CurrentMetrics::KeeperAliveConnections, CurrentMetrics::KeeperAliveConnections, 100, 10000, 10000)
, superdigest(superdigest_)
, keeper_context(keeper_context_)
, snapshot_manager_s3(snapshot_manager_s3_)
@ -272,8 +278,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
if (storage->isFinalized())
return false;
@ -295,14 +300,19 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
}
if (keeper_context->digestEnabled() && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, request_for_session.log_idx, false);
assertDigest(
*request_for_session.digest,
storage->getNodesDigest(false, /*lock_transaction_mutex=*/true),
*request_for_session.request,
request_for_session.log_idx,
false);
return true;
}
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session)
{
std::lock_guard _(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response))
{
@ -404,6 +414,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
try
{
std::shared_lock storage_lock(storage_mutex);
const auto op_num = request_for_session->request->getOpNum();
if (op_num == Coordination::OpNum::SessionID)
{
@ -417,7 +428,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
response_for_session.session_id = -1;
response_for_session.response = response;
std::lock_guard lock(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
@ -431,14 +442,19 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
parsed_request_cache.erase(request_for_session->session_id);
}
std::lock_guard lock(storage_and_responses_lock);
std::lock_guard lock(process_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
try_push(response_for_session);
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, request_for_session->log_idx, true);
assertDigest(
*request_for_session->digest,
storage->getNodesDigest(true, /*lock_transaction_mutex=*/true),
*request_for_session->request,
request_for_session->log_idx,
true);
}
ProfileEvents::increment(ProfileEvents::KeeperCommits);
@ -482,8 +498,6 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
}
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
SnapshotDeserializationResult snapshot_deserialization_result;
if (latest_snapshot_ptr)
snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
@ -491,6 +505,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
snapshot_deserialization_result
= snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));
std::unique_lock storage_lock(storage_mutex);
/// maybe some logs were preprocessed with log idx larger than the snapshot idx
/// we have to apply them to the new storage
storage->applyUncommittedState(*snapshot_deserialization_result.storage, snapshot_deserialization_result.snapshot_meta->get_last_log_idx());
@ -534,15 +549,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
void KeeperStateMachine::rollbackRequestNoLock(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::shared_lock lock(storage_mutex);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -561,7 +568,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock);
std::unique_lock lock(storage_mutex);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
@ -623,7 +630,6 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
{
/// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
@ -761,44 +767,71 @@ int KeeperStateMachine::read_logical_snp_obj(
return 1;
}
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session)
{
std::shared_lock storage_lock(storage_mutex);
/// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
std::lock_guard lock(process_and_responses_lock);
std::vector<KeeperStorage::ResponsesForSessions> all_responses;
if (request_for_session.size() > 100)
{
all_responses.resize(request_for_session.size());
//LOG_INFO(getLogger("Keeper"), "Has read requests {}", request_queue_it->second.size());
for (size_t i = 0; i < request_for_session.size(); ++i)
{
read_pool.scheduleOrThrowOnError([&, i]
{
const auto & read_request = request_for_session[i];
all_responses[i] = storage->processRequest(
read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
});
}
read_pool.wait();
}
else
{
all_responses.reserve(request_for_session.size());
for (const auto & read_request : request_for_session)
{
all_responses.push_back(storage->processRequest(
read_request.request, read_request.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/));
}
}
for (const auto & responses : all_responses)
for (const auto & response : responses)
if (!responses_queue.push(response))
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
}
void KeeperStateMachine::shutdownStorage()
{
std::lock_guard lock(storage_and_responses_lock);
std::unique_lock storage_lock(storage_mutex);
storage->finalize();
}
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getDeadSessions();
}
int64_t KeeperStateMachine::getNextZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getNextZXID();
}
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const
{
std::lock_guard lock(storage_and_responses_lock);
return storage->getNodesDigest(false);
std::shared_lock storage_lock(storage_mutex);
return storage->getNodesDigest(false, /*lock_transaction_mutex=*/true);
}
uint64_t KeeperStateMachine::getLastProcessedZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getZXID();
}
@ -809,61 +842,61 @@ const KeeperStorage::Stats & KeeperStateMachine::getStorageStats() const TSA_NO_
uint64_t KeeperStateMachine::getTotalWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getTotalWatchesCount();
}
uint64_t KeeperStateMachine::getWatchedPathsCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getWatchedPathsCount();
}
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getSessionsWithWatchesCount();
}
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getTotalEphemeralNodesCount();
}
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getSessionWithEphemeralNodesCount();
}
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpWatches(buf);
}
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpWatchesByPath(buf);
}
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
storage->dumpSessionsAndEphemerals(buf);
}
uint64_t KeeperStateMachine::getApproximateDataSize() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getApproximateDataSize();
}
uint64_t KeeperStateMachine::getKeyArenaSize() const
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
return storage->getArenaDataSize();
}
@ -904,7 +937,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
void KeeperStateMachine::recalculateStorageStats()
{
std::lock_guard lock(storage_and_responses_lock);
std::shared_lock storage_lock(storage_mutex);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");

View File

@ -100,7 +100,7 @@ public:
ClusterConfigPtr getClusterConfig() const;
/// Process local read request
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
void processReadRequest(const KeeperStorage::RequestsForSessions & request_for_session);
std::vector<int64_t> getDeadSessions();
@ -132,6 +132,8 @@ public:
void reconfigure(const KeeperStorage::RequestForSession& request_for_session);
private:
mutable SharedMutex storage_mutex;
CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
@ -140,7 +142,7 @@ private:
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
/// Main state machine logic
KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);
KeeperStoragePtr storage;
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager snapshot_manager;
@ -159,7 +161,7 @@ private:
/// we can get strange cases when, for example client send read request with
/// watch and after that receive watch response and only receive response
/// for request.
mutable std::mutex storage_and_responses_lock;
mutable std::mutex process_and_responses_lock;
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorage::RequestForSession>>> parsed_request_cache;
uint64_t min_request_size_to_cache{0};
@ -176,6 +178,7 @@ private:
mutable std::mutex cluster_config_lock;
ClusterConfigPtr cluster_config;
ThreadPool read_pool;
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
@ -184,7 +187,6 @@ private:
KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration(
const KeeperStorage::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock);
const KeeperStorage::RequestForSession& request_for_session);
};
}

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,9 @@
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h>
#include <Common/SharedMutex.h>
#include <base/defines.h>
#include <absl/container/flat_hash_set.h>
@ -28,36 +31,21 @@ struct KeeperStorageSnapshot;
class KeeperStorage
{
public:
/// Node should have as minimal size as possible to reduce memory footprint
/// of stored nodes
/// New fields should be added to the struct only if it's really necessary
struct Node
struct NodeStats
{
int64_t czxid{0};
int64_t mzxid{0};
int64_t pzxid{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default
int64_t mtime{0};
std::unique_ptr<char[]> data{nullptr};
uint32_t data_size{0};
int32_t version{0};
int32_t cversion{0};
int32_t aversion{0};
mutable uint64_t cached_digest = 0;
uint32_t data_size{0};
Node() = default;
Node & operator=(const Node & other);
Node(const Node & other);
Node & operator=(Node && other) noexcept;
Node(Node && other) noexcept;
bool empty() const;
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool isEphemeral() const
{
@ -88,6 +76,7 @@ public:
void setNumChildren(int32_t num_children)
{
is_ephemeral_and_ctime.is_ephemeral = false;
ephemeral_or_children_data.children_info.num_children = num_children;
}
@ -132,34 +121,6 @@ public:
is_ephemeral_and_ctime.ctime = ctime;
}
void copyStats(const Coordination::Stat & stat);
void setResponseStat(Coordination::Stat & response_stat) const;
/// Object memory size
uint64_t sizeInBytes() const;
void setData(const String & new_data);
std::string_view getData() const noexcept { return {data.get(), data_size}; }
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
auto & getChildren() { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
private:
/// as ctime can't be negative because it stores the timestamp when the
/// node was created, we can use the MSB for a bool
@ -180,7 +141,56 @@ public:
int32_t num_children;
} children_info;
} ephemeral_or_children_data{0};
};
/// Node should have as minimal size as possible to reduce memory footprint
/// of stored nodes
/// New fields should be added to the struct only if it's really necessary
struct Node
{
NodeStats stats;
std::unique_ptr<char[]> data{nullptr};
mutable uint64_t cached_digest = 0;
Node() = default;
Node & operator=(const Node & other);
Node(const Node & other);
Node & operator=(Node && other) noexcept;
Node(Node && other) noexcept;
bool empty() const;
void copyStats(const Coordination::Stat & stat);
void setResponseStat(Coordination::Stat & response_stat) const;
/// Object memory size
uint64_t sizeInBytes() const;
void setData(const String & new_data);
std::string_view getData() const noexcept { return {data.get(), stats.data_size}; }
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
auto & getChildren() { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
private:
ChildrenSet children{};
};
@ -249,7 +259,7 @@ public:
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
int64_t session_id_counter{1};
mutable std::shared_mutex storage_mutex;
SessionAndAuth session_and_auth;
@ -278,18 +288,33 @@ public:
struct RemoveNodeDelta
{
int32_t version{-1};
int64_t ephemeral_owner{0};
NodeStats stat;
Coordination::ACLs acls;
String data;
};
struct UpdateNodeDelta
struct UpdateNodeStatDelta
{
std::function<void(Node &)> update_fn;
explicit UpdateNodeStatDelta(const KeeperStorage::Node & node);
NodeStats old_stats;
NodeStats new_stats;
int32_t old_seq_num;
int32_t new_seq_num;
int32_t version{-1};
};
struct UpdateNodeDataDelta
{
std::string old_data;
std::string new_data;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs acls;
Coordination::ACLs old_acls;
Coordination::ACLs new_acls;
int32_t version{-1};
};
@ -311,11 +336,19 @@ public:
struct AddAuthDelta
{
int64_t session_id;
AuthID auth_id;
std::shared_ptr<AuthID> auth_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
using Operation = std::variant<
CreateNodeDelta,
RemoveNodeDelta,
UpdateNodeStatDelta,
UpdateNodeDataDelta,
SetACLDelta,
AddAuthDelta,
ErrorDelta,
SubDeltaEnd,
FailedMultiDelta>;
struct Delta
{
@ -334,15 +367,16 @@ public:
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void commit(int64_t commit_zxid);
void addDeltas(std::list<Delta> new_deltas);
void cleanup(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
std::shared_ptr<Node> getNode(StringRef path) const;
Coordination::ACLs getACLs(StringRef path) const;
void applyDeltas(const std::list<Delta> & new_deltas);
void applyDelta(const Delta & delta);
void rollbackDelta(const Delta & delta);
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate) const;
@ -350,13 +384,13 @@ public:
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
std::unordered_map<int64_t, std::list<std::pair<int64_t, std::shared_ptr<AuthID>>>> session_and_auth;
struct UncommittedNode
{
std::shared_ptr<Node> node{nullptr};
Coordination::ACLs acls{};
int64_t zxid{0};
std::optional<Coordination::ACLs> acls{};
std::vector<int64_t> applied_zxids{};
};
struct Hash
@ -383,9 +417,9 @@ public:
};
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
std::list<Delta> deltas;
mutable std::mutex deltas_mutex;
std::list<Delta> deltas TSA_GUARDED_BY(deltas_mutex);
KeeperStorage & storage;
};
@ -395,7 +429,7 @@ public:
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_log_idx);
Coordination::Error commit(int64_t zxid);
Coordination::Error commit(std::list<Delta> deltas);
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
@ -415,20 +449,24 @@ public:
void unregisterEphemeralPath(int64_t session_id, const std::string & path);
mutable std::mutex ephemerals_mutex;
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
Ephemerals ephemerals TSA_GUARDED_BY(ephemerals_mutex);
mutable std::mutex session_mutex;
int64_t session_id_counter TSA_GUARDED_BY(session_mutex) = 1;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
SessionExpiryQueue session_expiry_queue TSA_GUARDED_BY(session_mutex);
/// All active sessions with timeout
SessionAndTimeout session_and_timeout;
SessionAndTimeout session_and_timeout TSA_GUARDED_BY(session_mutex);
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
mutable std::mutex transaction_mutex;
/// Global id of all requests applied to storage
int64_t zxid{0};
int64_t zxid TSA_GUARDED_BY(transaction_mutex) = 0;
// older Keeper node (pre V5 snapshots) can create snapshots and receive logs from newer Keeper nodes
// this can lead to some inconsistencies, e.g. from snapshot it will use log_idx as zxid
@ -445,58 +483,48 @@ public:
int64_t log_idx = 0;
};
std::deque<TransactionInfo> uncommitted_transactions;
std::list<TransactionInfo> uncommitted_transactions TSA_GUARDED_BY(transaction_mutex);
uint64_t nodes_digest{0};
uint64_t nodes_digest = 0;
bool finalized{false};
std::atomic<bool> finalized{false};
mutable std::mutex watches_mutex;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers TSA_GUARDED_BY(watches_mutex);
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
Watches watches TSA_GUARDED_BY(watches_mutex);
Watches list_watches TSA_GUARDED_BY(watches_mutex); /// Watches for 'list' request (watches on children).
void clearDeadWatches(int64_t session_id);
/// Get current committed zxid
int64_t getZXID() const { return zxid; }
int64_t getZXID() const;
int64_t getNextZXID() const
{
if (uncommitted_transactions.empty())
return zxid + 1;
int64_t getNextZXID() const;
int64_t getNextZXIDLocked() const TSA_REQUIRES(transaction_mutex);
return uncommitted_transactions.back().zxid + 1;
}
Digest getNodesDigest(bool committed) const;
Digest getNodesDigest(bool committed, bool lock_transaction_mutex) const;
KeeperContextPtr keeper_context;
const String superdigest;
bool initialized{false};
std::atomic<bool> initialized{false};
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, bool initialize_system_nodes = true);
void initializeSystemNodes();
void initializeSystemNodes() TSA_NO_THREAD_SAFETY_ANALYSIS;
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
int64_t getSessionID(int64_t session_timeout_ms);
/// Add session id. Used when restoring KeeperStorage from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
void addSessionID(int64_t session_id, int64_t session_timeout_ms) TSA_NO_THREAD_SAFETY_ANALYSIS;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::vector<Delta> & new_deltas) const;
UInt64 calculateNodesDigest(UInt64 current_digest, const std::list<Delta> & new_deltas) const;
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
@ -523,21 +551,21 @@ public:
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
void enableSnapshotMode(size_t up_to_version);
/// Turn off snapshot mode.
void disableSnapshotMode() { container.disableSnapshotMode(); }
void disableSnapshotMode();
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
Container::const_iterator getSnapshotIteratorBegin() const;
/// Clear outdated data from internal container.
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
void clearGarbageAfterSnapshot();
/// Get all active sessions
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
SessionAndTimeout getActiveSessions() const;
/// Get all dead sessions
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
std::vector<int64_t> getDeadSessions() const;
struct Stats
{
@ -557,19 +585,19 @@ public:
const Stats & getStorageStats() const;
/// Introspection functions mostly used in 4-letter commands
uint64_t getNodesCount() const { return container.size(); }
uint64_t getNodesCount() const;
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
uint64_t getApproximateDataSize() const;
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
uint64_t getArenaDataSize() const;
uint64_t getTotalWatchesCount() const;
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
uint64_t getWatchedPathsCount() const;
uint64_t getSessionsWithWatchesCount() const;
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
uint64_t getSessionWithEphemeralNodesCount() const;
uint64_t getTotalEphemeralNodesCount() const;
void dumpWatches(WriteBufferFromOwnString & buf) const;
@ -578,6 +606,8 @@ public:
void recalculateStats();
private:
uint64_t getSessionWithEphemeralNodesCountLocked() const TSA_REQUIRES(ephemerals_mutex);
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);
};

View File

@ -90,7 +90,7 @@ void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in)
}
}
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerPtr log)
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t max_zxid = 0;
std::string path;
@ -102,38 +102,38 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerP
String data;
Coordination::read(data, in);
node.setData(data);
Coordination::read(node.acl_id, in);
Coordination::read(node.stats.acl_id, in);
/// Deserialize stat
Coordination::read(node.czxid, in);
Coordination::read(node.mzxid, in);
Coordination::read(node.stats.czxid, in);
Coordination::read(node.stats.mzxid, in);
/// For some reason ZXID specified in filename can be smaller
/// then actual zxid from nodes. In this case we will use zxid from nodes.
max_zxid = std::max(max_zxid, node.mzxid);
max_zxid = std::max(max_zxid, node.stats.mzxid);
int64_t ctime;
Coordination::read(ctime, in);
node.setCtime(ctime);
Coordination::read(node.mtime, in);
Coordination::read(node.version, in);
Coordination::read(node.cversion, in);
Coordination::read(node.aversion, in);
node.stats.setCtime(ctime);
Coordination::read(node.stats.mtime, in);
Coordination::read(node.stats.version, in);
Coordination::read(node.stats.cversion, in);
Coordination::read(node.stats.aversion, in);
int64_t ephemeral_owner;
Coordination::read(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
Coordination::read(node.pzxid, in);
node.stats.setEphemeralOwner(ephemeral_owner);
Coordination::read(node.stats.pzxid, in);
if (!path.empty())
{
if (ephemeral_owner == 0)
node.setSeqNum(node.cversion);
node.stats.setSeqNum(node.stats.cversion);
storage.container.insertOrReplace(path, node);
if (ephemeral_owner != 0)
storage.ephemerals[ephemeral_owner].insert(path);
storage.acl_map.addUsage(node.acl_id);
storage.acl_map.addUsage(node.stats.acl_id);
}
Coordination::read(path, in);
count++;
@ -146,14 +146,20 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerP
if (itr.key != "/")
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseNodeName(my_path)); value.increaseNumChildren(); });
storage.container.updateValue(
parent_path,
[my_path = itr.key](KeeperStorage::Node & value)
{
value.addChild(getBaseNodeName(my_path));
value.stats.increaseNumChildren();
});
}
}
return max_zxid;
}
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, LoggerPtr log)
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path);
int64_t zxid = getZxidFromName(snapshot_path);
@ -480,7 +486,7 @@ bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
}
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, LoggerPtr /*log*/)
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, LoggerPtr /*log*/) TSA_NO_THREAD_SAFETY_ANALYSIS
{
int64_t checksum;
Coordination::read(checksum, in);
@ -559,7 +565,7 @@ void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string
LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter);
}
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, LoggerPtr log)
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, LoggerPtr log) TSA_NO_THREAD_SAFETY_ANALYSIS
{
namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_logs;