Merge pull request #59592 from ClickHouse/keeper-more-reduce

Reduce size of node in Keeper even more
This commit is contained in:
Antonio Andelic 2024-02-12 15:59:39 +01:00 committed by GitHub
commit 5872ca520c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 471 additions and 276 deletions

View File

@ -1,5 +1,4 @@
#include <iostream>
#include <optional>
#include <boost/program_options.hpp>
#include <Coordination/KeeperSnapshotManager.h>

View File

@ -16,7 +16,7 @@
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include "Core/Field.h"
#include <Core/Field.h>
#include <Disks/DiskLocal.h>
@ -79,20 +79,20 @@ namespace
writeBinary(false, out);
/// Serialize stat
writeBinary(node.stat.czxid, out);
writeBinary(node.stat.mzxid, out);
writeBinary(node.stat.ctime, out);
writeBinary(node.stat.mtime, out);
writeBinary(node.stat.version, out);
writeBinary(node.stat.cversion, out);
writeBinary(node.stat.aversion, out);
writeBinary(node.stat.ephemeralOwner, out);
writeBinary(node.czxid, out);
writeBinary(node.mzxid, out);
writeBinary(node.ctime(), out);
writeBinary(node.mtime, out);
writeBinary(node.version, out);
writeBinary(node.cversion, out);
writeBinary(node.aversion, out);
writeBinary(node.ephemeralOwner(), out);
if (version < SnapshotVersion::V6)
writeBinary(static_cast<int32_t>(node.getData().size()), out);
writeBinary(node.stat.numChildren, out);
writeBinary(node.stat.pzxid, out);
writeBinary(static_cast<int32_t>(node.data_size), out);
writeBinary(node.numChildren(), out);
writeBinary(node.pzxid, out);
writeBinary(node.seq_num, out);
writeBinary(node.seqNum(), out);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
writeBinary(node.sizeInBytes(), out);
@ -102,7 +102,7 @@ namespace
{
String new_data;
readBinary(new_data, in);
node.setData(std::move(new_data));
node.setData(new_data);
if (version >= SnapshotVersion::V1)
{
@ -138,22 +138,36 @@ namespace
}
/// Deserialize stat
readBinary(node.stat.czxid, in);
readBinary(node.stat.mzxid, in);
readBinary(node.stat.ctime, in);
readBinary(node.stat.mtime, in);
readBinary(node.stat.version, in);
readBinary(node.stat.cversion, in);
readBinary(node.stat.aversion, in);
readBinary(node.stat.ephemeralOwner, in);
readBinary(node.czxid, in);
readBinary(node.mzxid, in);
int64_t ctime;
readBinary(ctime, in);
node.setCtime(ctime);
readBinary(node.mtime, in);
readBinary(node.version, in);
readBinary(node.cversion, in);
readBinary(node.aversion, in);
int64_t ephemeral_owner = 0;
readBinary(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
if (version < SnapshotVersion::V6)
{
int32_t data_length = 0;
readBinary(data_length, in);
}
readBinary(node.stat.numChildren, in);
readBinary(node.stat.pzxid, in);
readBinary(node.seq_num, in);
int32_t num_children = 0;
readBinary(num_children, in);
if (ephemeral_owner == 0)
node.setNumChildren(num_children);
readBinary(node.pzxid, in);
int32_t seq_num = 0;
readBinary(seq_num, in);
if (ephemeral_owner == 0)
node.setSeqNum(seq_num);
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
{
@ -238,7 +252,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
/// Benign race condition possible while taking snapshot: NuRaft decide to create snapshot at some log id
/// and only after some time we lock storage and enable snapshot mode. So snapshot_container_size can be
/// slightly bigger than required.
if (node.stat.mzxid > snapshot.zxid)
if (node.mzxid > snapshot.zxid)
break;
writeBinary(path, out);
@ -363,11 +377,6 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
if (recalculate_digest)
storage.nodes_digest = 0;
const auto is_node_empty = [](const auto & node)
{
return node.getData().empty() && node.stat == KeeperStorage::Node::Stat{};
};
for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
{
std::string path;
@ -395,7 +404,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
}
else if (match_result == EXACT)
{
if (!is_node_empty(node))
if (!node.empty())
{
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
{
@ -412,8 +421,8 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
}
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
if (node.isEphemeral())
storage.ephemerals[node.ephemeralOwner()].insert(path);
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
@ -433,16 +442,16 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{
if (itr.key != "/")
{
if (itr.value.stat.numChildren != static_cast<int32_t>(itr.value.getChildren().size()))
if (itr.value.numChildren() != static_cast<int32_t>(itr.value.getChildren().size()))
{
#ifdef NDEBUG
/// TODO (alesapin) remove this, it should be always CORRUPTED_DATA.
LOG_ERROR(getLogger("KeeperSnapshotManager"), "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}", itr.value.stat.numChildren, itr.value.getChildren().size(), itr.key);
" is different from actual children size {} for node {}", itr.value.numChildren(), itr.value.getChildren().size(), itr.key);
#else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Children counter in stat.numChildren {}"
" is different from actual children size {} for node {}",
itr.value.stat.numChildren, itr.value.getChildren().size(), itr.key);
itr.value.numChildren(), itr.value.getChildren().size(), itr.key);
#endif
}
}

View File

@ -166,54 +166,132 @@ KeeperStorage::ResponsesForSessions processWatchesImpl(
}
// When this function is updated, update CURRENT_DIGEST_VERSION!!
uint64_t calculateDigest(std::string_view path, std::string_view data, const KeeperStorage::Node::Stat & stat)
uint64_t calculateDigest(std::string_view path, const KeeperStorage::Node & node)
{
SipHash hash;
hash.update(path);
hash.update(data);
auto data = node.getData();
if (!data.empty())
{
chassert(data.data() != nullptr);
hash.update(data);
}
hash.update(stat.czxid);
hash.update(stat.mzxid);
hash.update(stat.ctime);
hash.update(stat.mtime);
hash.update(stat.version);
hash.update(stat.cversion);
hash.update(stat.aversion);
hash.update(stat.ephemeralOwner);
hash.update(stat.numChildren);
hash.update(stat.pzxid);
hash.update(node.czxid);
hash.update(node.mzxid);
hash.update(node.ctime());
hash.update(node.mtime);
hash.update(node.version);
hash.update(node.cversion);
hash.update(node.aversion);
hash.update(node.ephemeralOwner());
hash.update(node.numChildren());
hash.update(node.pzxid);
return hash.get64();
auto digest = hash.get64();
/// 0 means no cached digest
if (digest == 0)
return 1;
return digest;
}
}
KeeperStorage::Node & KeeperStorage::Node::operator=(const Node & other)
{
if (this == &other)
return *this;
czxid = other.czxid;
mzxid = other.mzxid;
pzxid = other.pzxid;
acl_id = other.acl_id;
mtime = other.mtime;
is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
ephemeral_or_children_data = other.ephemeral_or_children_data;
data_size = other.data_size;
version = other.version;
cversion = other.cversion;
aversion = other.aversion;
if (data_size != 0)
{
data = std::unique_ptr<char[]>(new char[data_size]);
memcpy(data.get(), other.data.get(), data_size);
}
children = other.children;
return *this;
}
KeeperStorage::Node::Node(const Node & other)
{
*this = other;
}
bool KeeperStorage::Node::empty() const
{
return data_size == 0 && mzxid == 0;
}
void KeeperStorage::Node::copyStats(const Coordination::Stat & stat)
{
czxid = stat.czxid;
mzxid = stat.mzxid;
pzxid = stat.pzxid;
mtime = stat.mtime;
setCtime(stat.ctime);
version = stat.version;
cversion = stat.cversion;
aversion = stat.aversion;
if (stat.ephemeralOwner == 0)
{
is_ephemeral_and_ctime.is_ephemeral = false;
setNumChildren(stat.numChildren);
}
else
{
setEphemeralOwner(stat.ephemeralOwner);
}
}
void KeeperStorage::Node::setResponseStat(Coordination::Stat & response_stat) const
{
response_stat.czxid = stat.czxid;
response_stat.mzxid = stat.mzxid;
response_stat.ctime = stat.ctime;
response_stat.mtime = stat.mtime;
response_stat.version = stat.version;
response_stat.cversion = stat.cversion;
response_stat.aversion = stat.aversion;
response_stat.ephemeralOwner = stat.ephemeralOwner;
response_stat.dataLength = static_cast<int32_t>(data.size());
response_stat.numChildren = stat.numChildren;
response_stat.pzxid = stat.pzxid;
response_stat.czxid = czxid;
response_stat.mzxid = mzxid;
response_stat.ctime = ctime();
response_stat.mtime = mtime;
response_stat.version = version;
response_stat.cversion = cversion;
response_stat.aversion = aversion;
response_stat.ephemeralOwner = ephemeralOwner();
response_stat.dataLength = static_cast<int32_t>(data_size);
response_stat.numChildren = numChildren();
response_stat.pzxid = pzxid;
}
uint64_t KeeperStorage::Node::sizeInBytes() const
{
return sizeof(Node) + children.size() * sizeof(StringRef) + data.size();
return sizeof(Node) + children.size() * sizeof(StringRef) + data_size;
}
void KeeperStorage::Node::setData(String new_data)
void KeeperStorage::Node::setData(const String & new_data)
{
data = std::move(new_data);
data_size = static_cast<uint32_t>(new_data.size());
if (data_size != 0)
{
data = std::unique_ptr<char[]>(new char[new_data.size()]);
memcpy(data.get(), new_data.data(), data_size);
}
}
void KeeperStorage::Node::addChild(StringRef child_path)
@ -228,25 +306,41 @@ void KeeperStorage::Node::removeChild(StringRef child_path)
void KeeperStorage::Node::invalidateDigestCache() const
{
has_cached_digest = false;
cached_digest = 0;
}
UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const
{
if (!has_cached_digest)
{
cached_digest = calculateDigest(path, data, stat);
has_cached_digest = true;
}
if (cached_digest == 0)
cached_digest = calculateDigest(path, *this);
return cached_digest;
};
void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
{
stat = other.stat;
seq_num = other.seq_num;
setData(other.getData());
czxid = other.czxid;
mzxid = other.mzxid;
pzxid = other.pzxid;
acl_id = other.acl_id; /// 0 -- no ACL by default
mtime = other.mtime;
is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
ephemeral_or_children_data = other.ephemeral_or_children_data;
data_size = other.data_size;
if (data_size != 0)
{
data = std::unique_ptr<char[]>(new char[data_size]);
memcpy(data.get(), other.data.get(), data_size);
}
version = other.version;
cversion = other.cversion;
aversion = other.aversion;
cached_digest = other.cached_digest;
}
@ -278,13 +372,13 @@ void KeeperStorage::initializeSystemNodes()
// update root and the digest based on it
auto current_root_it = container.find("/");
assert(current_root_it != container.end());
chassert(current_root_it != container.end());
removeDigest(current_root_it->value, "/");
auto updated_root_it = container.updateValue(
"/",
[](auto & node)
[](KeeperStorage::Node & node)
{
++node.stat.numChildren;
node.increaseNumChildren();
node.addChild(getBaseNodeName(keeper_system_path));
}
);
@ -294,7 +388,7 @@ void KeeperStorage::initializeSystemNodes()
// insert child system nodes
for (const auto & [path, data] : keeper_context->getSystemNodesWithData())
{
assert(path.starts_with(keeper_system_path));
chassert(path.starts_with(keeper_system_path));
Node child_system_node;
child_system_node.setData(data);
auto [map_key, _] = container.insert(std::string{path}, child_system_node);
@ -339,7 +433,7 @@ std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::tryGetNode
void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
{
assert(!delta.path.empty());
chassert(!delta.path.empty());
if (!nodes.contains(delta.path))
{
if (auto storage_node = tryGetNodeFromStorage(delta.path))
@ -355,22 +449,22 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{
assert(!node);
chassert(!node);
node = std::make_shared<Node>();
node->stat = operation.stat;
node->copyStats(operation.stat);
node->setData(operation.data);
acls = operation.acls;
last_applied_zxid = delta.zxid;
}
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{
assert(node);
chassert(node);
node = nullptr;
last_applied_zxid = delta.zxid;
}
else if constexpr (std::same_as<DeltaType, UpdateNodeDelta>)
{
assert(node);
chassert(node);
node->invalidateDigestCache();
operation.update_fn(*node);
last_applied_zxid = delta.zxid;
@ -384,6 +478,40 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
delta.operation);
}
bool KeeperStorage::UncommittedState::hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate) const
{
const auto check_auth = [&](const auto & auth_ids)
{
for (const auto & auth : auth_ids)
{
using TAuth = std::remove_reference_t<decltype(auth)>;
const AuthID * auth_ptr = nullptr;
if constexpr (std::is_pointer_v<TAuth>)
auth_ptr = auth;
else
auth_ptr = &auth;
if (predicate(*auth_ptr))
return true;
}
return false;
};
if (is_local)
return check_auth(storage.session_and_auth[session_id]);
if (check_auth(storage.session_and_auth[session_id]))
return true;
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
if (auth_it == session_and_auth.end())
return false;
return check_auth(auth_it->second);
}
void KeeperStorage::UncommittedState::addDelta(Delta new_delta)
{
const auto & added_delta = deltas.emplace_back(std::move(new_delta));
@ -408,7 +536,7 @@ void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
{
assert(deltas.empty() || deltas.front().zxid >= commit_zxid);
chassert(deltas.empty() || deltas.front().zxid >= commit_zxid);
// collect nodes that have no further modification in the current transaction
std::unordered_set<std::string> modified_nodes;
@ -426,7 +554,7 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
if (!front_delta.path.empty())
{
auto & path_deltas = deltas_for_path.at(front_delta.path);
assert(path_deltas.front() == &front_delta);
chassert(path_deltas.front() == &front_delta);
path_deltas.pop_front();
if (path_deltas.empty())
{
@ -444,7 +572,7 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
else if (auto * add_auth = std::get_if<AddAuthDelta>(&front_delta.operation))
{
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
assert(!uncommitted_auth.empty() && uncommitted_auth.front() == &add_auth->auth_id);
chassert(!uncommitted_auth.empty() && uncommitted_auth.front() == &add_auth->auth_id);
uncommitted_auth.pop_front();
if (uncommitted_auth.empty())
session_and_auth.erase(add_auth->session_id);
@ -484,7 +612,7 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
if (delta_it->zxid < rollback_zxid)
break;
assert(delta_it->zxid == rollback_zxid);
chassert(delta_it->zxid == rollback_zxid);
if (!delta_it->path.empty())
{
std::visit(
@ -671,7 +799,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
if (node_it == container.end())
onStorageInconsistency();
if (operation.version != -1 && operation.version != node_it->value.stat.version)
if (operation.version != -1 && operation.version != node_it->value.version)
onStorageInconsistency();
removeDigest(node_it->value, path);
@ -693,7 +821,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
if (node_it == container.end())
onStorageInconsistency();
if (operation.version != -1 && operation.version != node_it->value.stat.aversion)
if (operation.version != -1 && operation.version != node_it->value.aversion)
onStorageInconsistency();
acl_map.removeUsage(node_it->value.acl_id);
@ -738,7 +866,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
bool KeeperStorage::createNode(
const std::string & path,
String data,
const KeeperStorage::Node::Stat & stat,
const Coordination::Stat & stat,
Coordination::ACLs node_acls)
{
auto parent_path = parentNodePath(path);
@ -747,7 +875,7 @@ bool KeeperStorage::createNode(
if (node_it == container.end())
return false;
if (node_it->value.stat.ephemeralOwner != 0)
if (node_it->value.isEphemeral())
return false;
if (container.contains(path))
@ -759,8 +887,8 @@ bool KeeperStorage::createNode(
acl_map.addUsage(acl_id);
created_node.acl_id = acl_id;
created_node.stat = stat;
created_node.setData(std::move(data));
created_node.copyStats(stat);
created_node.setData(data);
auto [map_key, _] = container.insert(path, created_node);
/// Take child path from key owned by map.
auto child_path = getBaseNodeName(map_key->getKey());
@ -769,7 +897,7 @@ bool KeeperStorage::createNode(
[child_path](KeeperStorage::Node & parent)
{
parent.addChild(child_path);
chassert(parent.stat.numChildren == static_cast<int32_t>(parent.getChildren().size()));
chassert(parent.numChildren() == static_cast<int32_t>(parent.getChildren().size()));
}
);
@ -783,21 +911,22 @@ bool KeeperStorage::removeNode(const std::string & path, int32_t version)
if (node_it == container.end())
return false;
if (version != -1 && version != node_it->value.stat.version)
if (version != -1 && version != node_it->value.version)
return false;
if (node_it->value.stat.numChildren)
if (node_it->value.numChildren())
return false;
auto prev_node = node_it->value;
acl_map.removeUsage(prev_node.acl_id);
KeeperStorage::Node prev_node;
prev_node.shallowCopy(node_it->value);
acl_map.removeUsage(node_it->value.acl_id);
container.updateValue(
parentNodePath(path),
[child_basename = getBaseNodeName(node_it->key)](KeeperStorage::Node & parent)
{
parent.removeChild(child_basename);
chassert(parent.stat.numChildren == static_cast<int32_t>(parent.getChildren().size()));
chassert(parent.numChildren() == static_cast<int32_t>(parent.getChildren().size()));
}
);
@ -957,7 +1086,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
if (parent_node == nullptr)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
else if (parent_node->stat.ephemeralOwner != 0)
else if (parent_node->isEphemeral())
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}};
std::string path_created = request.path;
@ -966,7 +1095,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
if (request.not_exists)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
auto seq_num = parent_node->seq_num;
auto seq_num = parent_node->seqNum();
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
seq_num_str.exceptions(std::ios::failbit);
@ -1006,20 +1135,20 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
auto parent_update = [parent_cversion, zxid](KeeperStorage::Node & node)
{
/// Increment sequential number even if node is not sequential
++node.seq_num;
node.increaseSeqNum();
if (parent_cversion == -1)
++node.stat.cversion;
else if (parent_cversion > node.stat.cversion)
node.stat.cversion = parent_cversion;
++node.cversion;
else if (parent_cversion > node.cversion)
node.cversion = parent_cversion;
if (zxid > node.stat.pzxid)
node.stat.pzxid = zxid;
++node.stat.numChildren;
if (zxid > node.pzxid)
node.pzxid = zxid;
node.increaseNumChildren();
};
new_deltas.emplace_back(std::string{parent_path}, zxid, KeeperStorage::UpdateNodeDelta{std::move(parent_update)});
KeeperStorage::Node::Stat stat;
Coordination::Stat stat;
stat.czxid = zxid;
stat.mzxid = zxid;
stat.pzxid = zxid;
@ -1133,7 +1262,8 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
else
{
node_it->value.setResponseStat(response.stat);
response.data = node_it->value.getData();
auto data = node_it->value.getData();
response.data = std::string(data);
response.error = Coordination::Error::ZOK;
}
@ -1190,8 +1320,8 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
[zxid](KeeperStorage::Node & parent)
{
if (parent.stat.pzxid < zxid)
parent.stat.pzxid = zxid;
if (parent.pzxid < zxid)
parent.pzxid = zxid;
}
}
);
@ -1205,9 +1335,9 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
update_parent_pzxid();
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
}
else if (request.version != -1 && request.version != node->stat.version)
else if (request.version != -1 && request.version != node->version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
else if (node->stat.numChildren != 0)
else if (node->numChildren() != 0)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
@ -1218,14 +1348,14 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
zxid,
KeeperStorage::UpdateNodeDelta{[](KeeperStorage::Node & parent)
{
++parent.stat.cversion;
--parent.stat.numChildren;
++parent.cversion;
parent.decreaseNumChildren();
}});
new_deltas.emplace_back(request.path, zxid, KeeperStorage::RemoveNodeDelta{request.version, node->stat.ephemeralOwner});
new_deltas.emplace_back(request.path, zxid, KeeperStorage::RemoveNodeDelta{request.version, node->ephemeralOwner()});
if (node->stat.ephemeralOwner != 0)
storage.unregisterEphemeralPath(node->stat.ephemeralOwner, request.path);
if (node->isEphemeral())
storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path);
digest = storage.calculateNodesDigest(digest, new_deltas);
@ -1339,7 +1469,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
if (request.version != -1 && request.version != node->version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
new_deltas.emplace_back(
@ -1348,9 +1478,9 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
KeeperStorage::UpdateNodeDelta{
[zxid, data = request.data, time](KeeperStorage::Node & value)
{
value.stat.version++;
value.stat.mzxid = zxid;
value.stat.mtime = time;
value.version++;
value.mzxid = zxid;
value.mtime = time;
value.setData(data);
},
request.version});
@ -1362,7 +1492,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
{
[](KeeperStorage::Node & parent)
{
parent.stat.cversion++;
parent.cversion++;
}
}
);
@ -1464,9 +1594,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
auto list_request_type = ALL;
if (auto * filtered_list = dynamic_cast<Coordination::ZooKeeperFilteredListRequest *>(&request))
{
list_request_type = filtered_list->list_request_type;
}
if (list_request_type == ALL)
return true;
@ -1476,7 +1604,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
if (child_it == container.end())
onStorageInconsistency();
const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0;
const auto is_ephemeral = child_it->value.isEphemeral();
return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY);
};
@ -1529,7 +1657,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
auto node = storage.uncommitted_state.getNode(request.path);
if (check_not_exists)
{
if (node && (request.version == -1 || request.version == node->stat.version))
if (node && (request.version == -1 || request.version == node->version))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
}
else
@ -1537,7 +1665,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
if (!node)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
if (request.version != -1 && request.version != node->stat.version)
if (request.version != -1 && request.version != node->version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
}
@ -1573,7 +1701,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
if (check_not_exists)
{
if (node_it != container.end() && (request.version == -1 || request.version == node_it->value.stat.version))
if (node_it != container.end() && (request.version == -1 || request.version == node_it->value.version))
on_error(Coordination::Error::ZNODEEXISTS);
else
response.error = Coordination::Error::ZOK;
@ -1582,7 +1710,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
{
if (node_it == container.end())
on_error(Coordination::Error::ZNONODE);
else if (request.version != -1 && request.version != node_it->value.stat.version)
else if (request.version != -1 && request.version != node_it->value.version)
on_error(Coordination::Error::ZBADVERSION);
else
response.error = Coordination::Error::ZOK;
@ -1635,7 +1763,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
auto node = uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.aversion)
if (request.version != -1 && request.version != node->aversion)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
@ -1655,7 +1783,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
zxid,
KeeperStorage::UpdateNodeDelta
{
[](KeeperStorage::Node & n) { ++n.stat.aversion; }
[](KeeperStorage::Node & n) { ++n.aversion; }
}
}
};
@ -1824,7 +1952,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
}
assert(request.requests.empty() || operation_type.has_value());
chassert(request.requests.empty() || operation_type.has_value());
}
std::vector<KeeperStorage::Delta>
@ -1873,7 +2001,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
auto & deltas = storage.uncommitted_state.deltas;
// the deltas will have at least SubDeltaEnd or FailedMultiDelta
assert(!deltas.empty());
chassert(!deltas.empty());
if (auto * failed_multi = std::get_if<KeeperStorage::FailedMultiDelta>(&deltas.front().operation))
{
for (size_t i = 0; i < concrete_requests.size(); ++i)
@ -2073,7 +2201,7 @@ UInt64 KeeperStorage::calculateNodesDigest(UInt64 current_digest, const std::vec
[&](const CreateNodeDelta & create_delta)
{
auto node = std::make_shared<Node>();
node->stat = create_delta.stat;
node->copyStats(create_delta.stat);
node->setData(create_delta.data);
updated_nodes.emplace(delta.path, node);
},
@ -2196,8 +2324,8 @@ void KeeperStorage::preprocessRequest(
{
[ephemeral_path](Node & parent)
{
++parent.stat.cversion;
--parent.stat.numChildren;
++parent.cversion;
parent.decreaseNumChildren();
}
}
);
@ -2300,7 +2428,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
if (is_local)
{
assert(zk_request->isReadRequest());
chassert(zk_request->isReadRequest());
if (check_acl && !request_processor->checkAuth(*this, session_id, true))
{
response = zk_request->makeResponse();

View File

@ -5,17 +5,15 @@
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/KeeperContext.h>
#include <absl/container/flat_hash_set.h>
namespace DB
{
class KeeperContext;
using KeeperContextPtr = std::shared_ptr<KeeperContext>;
struct KeeperStorageRequestProcessor;
using KeeperStorageRequestProcessorPtr = std::shared_ptr<KeeperStorageRequestProcessor>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
@ -35,40 +33,113 @@ public:
/// New fields should be added to the struct only if it's really necessary
struct Node
{
/// to reduce size of the Node struct we use a custom Stat without dataLength
struct Stat
{
int64_t czxid{0};
int64_t mzxid{0};
int64_t ctime{0};
int64_t mtime{0};
int32_t version{0};
int32_t cversion{0};
int32_t aversion{0};
int32_t numChildren{0}; /// NOLINT
int64_t ephemeralOwner{0}; /// NOLINT
int64_t pzxid{0};
bool operator==(const Stat &) const = default;
};
int64_t czxid{0};
int64_t mzxid{0};
int64_t pzxid{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default
Stat stat{};
int32_t seq_num = 0;
/// we cannot use `std::optional<uint64_t> because we want to
/// pack the boolean with seq_num above
mutable bool has_cached_digest = false;
int64_t mtime{0};
std::unique_ptr<char[]> data{nullptr};
uint32_t data_size{0};
int32_t version{0};
int32_t cversion{0};
int32_t aversion{0};
mutable uint64_t cached_digest = 0;
Node() = default;
Node & operator=(const Node & other);
Node(const Node & other);
bool empty() const;
bool isEphemeral() const
{
return is_ephemeral_and_ctime.is_ephemeral;
}
int64_t ephemeralOwner() const
{
if (isEphemeral())
return ephemeral_or_children_data.ephemeral_owner;
return 0;
}
void setEphemeralOwner(int64_t ephemeral_owner)
{
is_ephemeral_and_ctime.is_ephemeral = ephemeral_owner != 0;
ephemeral_or_children_data.ephemeral_owner = ephemeral_owner;
}
int32_t numChildren() const
{
if (isEphemeral())
return 0;
return ephemeral_or_children_data.children_info.num_children;
}
void setNumChildren(int32_t num_children)
{
ephemeral_or_children_data.children_info.num_children = num_children;
}
void increaseNumChildren()
{
chassert(!isEphemeral());
++ephemeral_or_children_data.children_info.num_children;
}
void decreaseNumChildren()
{
chassert(!isEphemeral());
--ephemeral_or_children_data.children_info.num_children;
}
int32_t seqNum() const
{
if (isEphemeral())
return 0;
return ephemeral_or_children_data.children_info.seq_num;
}
void setSeqNum(int32_t seq_num)
{
ephemeral_or_children_data.children_info.seq_num = seq_num;
}
void increaseSeqNum()
{
chassert(!isEphemeral());
++ephemeral_or_children_data.children_info.seq_num;
}
int64_t ctime() const
{
return is_ephemeral_and_ctime.ctime;
}
void setCtime(uint64_t ctime)
{
is_ephemeral_and_ctime.ctime = ctime;
}
void copyStats(const Coordination::Stat & stat);
void setResponseStat(Coordination::Stat & response_stat) const;
/// Object memory size
uint64_t sizeInBytes() const;
void setData(String new_data);
void setData(const String & new_data);
const auto & getData() const noexcept { return data; }
std::string_view getData() const noexcept { return {data.get(), data_size}; }
void addChild(StringRef child_path);
@ -87,19 +158,46 @@ public:
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
private:
String data;
/// as ctime can't be negative because it stores the timestamp when the
/// node was created, we can use the MSB for a bool
struct
{
bool is_ephemeral : 1;
int64_t ctime : 63;
} is_ephemeral_and_ctime{false, 0};
/// ephemeral notes cannot have children so a node can set either
/// ephemeral_owner OR seq_num + num_children
union
{
int64_t ephemeral_owner;
struct
{
int32_t seq_num;
int32_t num_children;
} children_info;
} ephemeral_or_children_data{0};
ChildrenSet children{};
};
#if !defined(ADDRESS_SANITIZER) && !defined(MEMORY_SANITIZER)
static_assert(
sizeof(ListNode<Node>) <= 144,
"std::list node containing ListNode<Node> is > 160 bytes (sizeof(ListNode<Node>) + 16 bytes for pointers) which will increase "
"memory consumption");
#endif
enum DigestVersion : uint8_t
{
NO_DIGEST = 0,
V1 = 1,
V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid
V3 = 3 // fixed bug with casting, removed duplicate czxid usage
V3 = 3, // fixed bug with casting, removed duplicate czxid usage
V4 = 4 // 0 is not a valid digest value
};
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V3;
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V4;
struct ResponseForSession
{
@ -169,7 +267,7 @@ public:
// - quickly commit the changes to the storage
struct CreateNodeDelta
{
KeeperStorage::Node::Stat stat;
Coordination::Stat stat;
Coordination::ACLs acls;
String data;
};
@ -243,39 +341,7 @@ public:
void applyDelta(const Delta & delta);
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
{
const auto check_auth = [&](const auto & auth_ids)
{
for (const auto & auth : auth_ids)
{
using TAuth = std::remove_reference_t<decltype(auth)>;
const AuthID * auth_ptr = nullptr;
if constexpr (std::is_pointer_v<TAuth>)
auth_ptr = auth;
else
auth_ptr = &auth;
if (predicate(*auth_ptr))
return true;
}
return false;
};
if (is_local)
return check_auth(storage.session_and_auth[session_id]);
if (check_auth(storage.session_and_auth[session_id]))
return true;
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
if (auth_it == session_and_auth.end())
return false;
return check_auth(auth_it->second);
}
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate) const;
void forEachAuthInSession(int64_t session_id, std::function<void(const AuthID &)> func) const;
@ -334,7 +400,7 @@ public:
bool createNode(
const std::string & path,
String data,
const KeeperStorage::Node::Stat & stat,
const Coordination::Stat & stat,
Coordination::ACLs node_acls);
// Remove node in the storage

View File

@ -2,72 +2,58 @@
#include <base/StringRef.h>
#include <Common/HashTable/HashMap.h>
#include <Common/ArenaUtils.h>
#include <list>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template<typename V>
struct ListNode
{
StringRef key;
V value;
/// |* * ****** |
/// ^ ^ ^
/// active_in_map free_key version
/// (1 byte) (1 byte) (6 bytes)
uint64_t node_metadata = 0;
struct
{
bool active_in_map : 1;
bool free_key : 1;
uint64_t version : 62;
} node_metadata{false, false, 0};
void setInactiveInMap()
{
node_metadata &= ~active_in_map_mask;
node_metadata.active_in_map = false;
}
void setActiveInMap()
{
node_metadata |= active_in_map_mask;
node_metadata.active_in_map = true;
}
bool isActiveInMap()
{
return node_metadata & active_in_map_mask;
return node_metadata.active_in_map;
}
void setFreeKey()
{
node_metadata |= free_key_mask;
node_metadata.free_key = true;
}
bool getFreeKey()
{
return node_metadata & free_key_mask;
return node_metadata.free_key;
}
uint64_t getVersion()
{
return node_metadata & version_mask;
return node_metadata.version;
}
void setVersion(uint64_t version)
{
if (version > version_mask)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Snapshot version {} is larger than maximum allowed value {}", version, version_mask);
node_metadata &= ~version_mask;
node_metadata |= version;
node_metadata.version = version;
}
static constexpr uint64_t active_in_map_mask = static_cast<uint64_t>(1) << 63;
static constexpr uint64_t free_key_mask = static_cast<uint64_t>(1) << 62;
static constexpr uint64_t version_mask = ~(static_cast<uint64_t>(3) << 62);
};
template <class V>

View File

@ -101,30 +101,37 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerP
KeeperStorage::Node node{};
String data;
Coordination::read(data, in);
node.setData(std::move(data));
node.setData(data);
Coordination::read(node.acl_id, in);
/// Deserialize stat
Coordination::read(node.stat.czxid, in);
Coordination::read(node.stat.mzxid, in);
Coordination::read(node.czxid, in);
Coordination::read(node.mzxid, in);
/// For some reason ZXID specified in filename can be smaller
/// then actual zxid from nodes. In this case we will use zxid from nodes.
max_zxid = std::max(max_zxid, node.stat.mzxid);
max_zxid = std::max(max_zxid, node.mzxid);
Coordination::read(node.stat.ctime, in);
Coordination::read(node.stat.mtime, in);
Coordination::read(node.stat.version, in);
Coordination::read(node.stat.cversion, in);
Coordination::read(node.stat.aversion, in);
Coordination::read(node.stat.ephemeralOwner, in);
Coordination::read(node.stat.pzxid, in);
int64_t ctime;
Coordination::read(ctime, in);
node.setCtime(ctime);
Coordination::read(node.mtime, in);
Coordination::read(node.version, in);
Coordination::read(node.cversion, in);
Coordination::read(node.aversion, in);
int64_t ephemeral_owner;
Coordination::read(ephemeral_owner, in);
if (ephemeral_owner != 0)
node.setEphemeralOwner(ephemeral_owner);
Coordination::read(node.pzxid, in);
if (!path.empty())
{
node.seq_num = node.stat.cversion;
if (ephemeral_owner == 0)
node.setSeqNum(node.cversion);
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
if (ephemeral_owner != 0)
storage.ephemerals[ephemeral_owner].insert(path);
storage.acl_map.addUsage(node.acl_id);
}
@ -139,7 +146,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerP
if (itr.key != "/")
{
auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseNodeName(my_path)); ++value.stat.numChildren; });
storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseNodeName(my_path)); value.increaseNumChildren(); });
}
}

View File

@ -1508,7 +1508,7 @@ void addNode(DB::KeeperStorage & storage, const std::string & path, const std::s
using Node = DB::KeeperStorage::Node;
Node node{};
node.setData(data);
node.stat.ephemeralOwner = ephemeral_owner;
node.setEphemeralOwner(ephemeral_owner);
storage.container.insertOrReplace(path, node);
auto child_it = storage.container.find(path);
auto child_path = DB::getBaseNodeName(child_it->key);
@ -1517,7 +1517,7 @@ void addNode(DB::KeeperStorage & storage, const std::string & path, const std::s
[&](auto & parent)
{
parent.addChild(child_path);
parent.stat.numChildren++;
parent.increaseNumChildren();
});
}
@ -1530,12 +1530,12 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context);
addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3);
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
storage.ephemerals[3] = {"/hello"};
storage.ephemerals[1] = {"/hello/somepath"};
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
storage.getSessionID(130);
@ -1556,13 +1556,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
auto [restored_storage, snapshot_meta, _] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 6);
EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 2);
EXPECT_EQ(restored_storage->container.getValue("/hello").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/hello1").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/hello2").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").getData(), "");
EXPECT_EQ(restored_storage->container.getValue("/hello").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getData(), "somedata");
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
@ -2251,12 +2251,12 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context);
addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3);
addNode(storage, "/hello1", "world", 1);
addNode(storage, "/hello2", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
storage.ephemerals[3] = {"/hello"};
storage.ephemerals[1] = {"/hello/somepath"};
storage.ephemerals[3] = {"/hello2"};
storage.ephemerals[1] = {"/hello1"};
storage.getSessionID(130);
storage.getSessionID(130);
@ -2273,13 +2273,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
auto [restored_storage, snapshot_meta, _] = new_manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 6);
EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 2);
EXPECT_EQ(restored_storage->container.getValue("/hello").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/hello1").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/hello2").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").getData(), "");
EXPECT_EQ(restored_storage->container.getValue("/hello").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getData(), "somedata");
EXPECT_EQ(restored_storage->container.getValue("/hello1").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello2").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
@ -2948,7 +2948,7 @@ TEST_P(CoordinationTest, TestCheckNotExistsRequest)
create_path("/test_node");
auto node_it = storage.container.find("/test_node");
ASSERT_NE(node_it, storage.container.end());
auto node_version = node_it->value.stat.version;
auto node_version = node_it->value.version;
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS");

View File

@ -25,13 +25,13 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
keys.pop();
std::cout << key << "\n";
auto value = storage.container.getValue(key);
std::cout << "\tStat: {version: " << value.stat.version <<
", mtime: " << value.stat.mtime <<
", emphemeralOwner: " << value.stat.ephemeralOwner <<
", czxid: " << value.stat.czxid <<
", mzxid: " << value.stat.mzxid <<
", numChildren: " << value.stat.numChildren <<
", dataLength: " << value.getData().size() <<
std::cout << "\tStat: {version: " << value.version <<
", mtime: " << value.mtime <<
", emphemeralOwner: " << value.ephemeralOwner() <<
", czxid: " << value.czxid <<
", mzxid: " << value.mzxid <<
", numChildren: " << value.numChildren() <<
", dataLength: " << value.data_size <<
"}" << std::endl;
std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl;