Fix data race and leak

This commit is contained in:
Antonio Andelic 2024-02-09 10:40:43 +01:00
parent dfb3a8f7be
commit 42f2fefeab
4 changed files with 39 additions and 63 deletions

View File

@ -82,7 +82,7 @@ namespace
writeBinary(node.czxid, out); writeBinary(node.czxid, out);
writeBinary(node.mzxid, out); writeBinary(node.mzxid, out);
writeBinary(node.ctime(), out); writeBinary(node.ctime(), out);
writeBinary(node.mtime(), out); writeBinary(node.mtime, out);
writeBinary(node.version, out); writeBinary(node.version, out);
writeBinary(node.cversion, out); writeBinary(node.cversion, out);
writeBinary(node.aversion, out); writeBinary(node.aversion, out);
@ -143,9 +143,7 @@ namespace
int64_t ctime; int64_t ctime;
readBinary(ctime, in); readBinary(ctime, in);
node.setCtime(ctime); node.setCtime(ctime);
int64_t mtime; readBinary(node.mtime, in);
readBinary(mtime, in);
node.setMtime(mtime);
readBinary(node.version, in); readBinary(node.version, in);
readBinary(node.cversion, in); readBinary(node.cversion, in);
readBinary(node.aversion, in); readBinary(node.aversion, in);

View File

@ -175,13 +175,13 @@ uint64_t calculateDigest(std::string_view path, const KeeperStorage::Node & node
if (node.data_size != 0) if (node.data_size != 0)
{ {
chassert(node.data != nullptr); chassert(node.data != nullptr);
hash.update(node.data, node.data_size); hash.update(node.getData());
} }
hash.update(node.czxid); hash.update(node.czxid);
hash.update(node.mzxid); hash.update(node.mzxid);
hash.update(node.ctime()); hash.update(node.ctime());
hash.update(node.mtime()); hash.update(node.mtime);
hash.update(node.version); hash.update(node.version);
hash.update(node.cversion); hash.update(node.cversion);
hash.update(node.aversion); hash.update(node.aversion);
@ -189,17 +189,17 @@ uint64_t calculateDigest(std::string_view path, const KeeperStorage::Node & node
hash.update(node.numChildren()); hash.update(node.numChildren());
hash.update(node.pzxid); hash.update(node.pzxid);
auto digest = hash.get64();
/// 0 means no cached digest
if (digest == 0)
return 1;
return hash.get64(); return hash.get64();
} }
} }
KeeperStorage::Node::~Node()
{
if (data_size)
delete[] data;
}
KeeperStorage::Node & KeeperStorage::Node::operator=(const Node & other) KeeperStorage::Node & KeeperStorage::Node::operator=(const Node & other)
{ {
if (this == &other) if (this == &other)
@ -209,8 +209,8 @@ KeeperStorage::Node & KeeperStorage::Node::operator=(const Node & other)
mzxid = other.mzxid; mzxid = other.mzxid;
pzxid = other.pzxid; pzxid = other.pzxid;
acl_id = other.acl_id; acl_id = other.acl_id;
has_cached_digest_and_ctime = other.has_cached_digest_and_ctime; mtime = other.mtime;
is_ephemeral_and_mtime = other.is_ephemeral_and_mtime; is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
ephemeral_or_children_data = other.ephemeral_or_children_data; ephemeral_or_children_data = other.ephemeral_or_children_data;
data_size = other.data_size; data_size = other.data_size;
version = other.version; version = other.version;
@ -219,8 +219,8 @@ KeeperStorage::Node & KeeperStorage::Node::operator=(const Node & other)
if (data_size != 0) if (data_size != 0)
{ {
data = new char[data_size]; data = std::unique_ptr<char[]>(new char[data_size]);
memcpy(data, other.data, data_size); memcpy(data.get(), other.data.get(), data_size);
} }
children = other.children; children = other.children;
@ -244,8 +244,8 @@ void KeeperStorage::Node::copyStats(const Coordination::Stat & stat)
mzxid = stat.mzxid; mzxid = stat.mzxid;
pzxid = stat.pzxid; pzxid = stat.pzxid;
mtime = stat.mtime;
setCtime(stat.ctime); setCtime(stat.ctime);
setMtime(stat.mtime);
version = stat.version; version = stat.version;
cversion = stat.cversion; cversion = stat.cversion;
@ -253,7 +253,7 @@ void KeeperStorage::Node::copyStats(const Coordination::Stat & stat)
if (stat.ephemeralOwner == 0) if (stat.ephemeralOwner == 0)
{ {
is_ephemeral_and_mtime.is_ephemeral = false; is_ephemeral_and_ctime.is_ephemeral = false;
setNumChildren(stat.numChildren); setNumChildren(stat.numChildren);
} }
else else
@ -267,7 +267,7 @@ void KeeperStorage::Node::setResponseStat(Coordination::Stat & response_stat) co
response_stat.czxid = czxid; response_stat.czxid = czxid;
response_stat.mzxid = mzxid; response_stat.mzxid = mzxid;
response_stat.ctime = ctime(); response_stat.ctime = ctime();
response_stat.mtime = mtime(); response_stat.mtime = mtime;
response_stat.version = version; response_stat.version = version;
response_stat.cversion = cversion; response_stat.cversion = cversion;
response_stat.aversion = aversion; response_stat.aversion = aversion;
@ -288,8 +288,8 @@ void KeeperStorage::Node::setData(const String & new_data)
data_size = static_cast<uint32_t>(new_data.size()); data_size = static_cast<uint32_t>(new_data.size());
if (data_size != 0) if (data_size != 0)
{ {
data = new char[new_data.size()]; data = std::unique_ptr<char[]>(new char[new_data.size()]);
memcpy(data, new_data.data(), data_size); memcpy(data.get(), new_data.data(), data_size);
} }
} }
@ -305,16 +305,13 @@ void KeeperStorage::Node::removeChild(StringRef child_path)
void KeeperStorage::Node::invalidateDigestCache() const void KeeperStorage::Node::invalidateDigestCache() const
{ {
has_cached_digest_and_ctime.has_cached_digest = false; cached_digest = 0;
} }
UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const
{ {
if (!has_cached_digest_and_ctime.has_cached_digest) if (cached_digest == 0)
{
cached_digest = calculateDigest(path, *this); cached_digest = calculateDigest(path, *this);
has_cached_digest_and_ctime.has_cached_digest = true;
}
return cached_digest; return cached_digest;
}; };
@ -326,17 +323,17 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
pzxid = other.pzxid; pzxid = other.pzxid;
acl_id = other.acl_id; /// 0 -- no ACL by default acl_id = other.acl_id; /// 0 -- no ACL by default
has_cached_digest_and_ctime = other.has_cached_digest_and_ctime; mtime = other.mtime;
is_ephemeral_and_mtime = other.is_ephemeral_and_mtime; is_ephemeral_and_ctime = other.is_ephemeral_and_ctime;
ephemeral_or_children_data = other.ephemeral_or_children_data; ephemeral_or_children_data = other.ephemeral_or_children_data;
data_size = other.data_size; data_size = other.data_size;
if (data_size != 0) if (data_size != 0)
{ {
data = new char[data_size]; data = std::unique_ptr<char[]>(new char[data_size]);
memcpy(data, other.data, data_size); memcpy(data.get(), other.data.get(), data_size);
} }
version = other.version; version = other.version;
@ -1448,7 +1445,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
{ {
value.version++; value.version++;
value.mzxid = zxid; value.mzxid = zxid;
value.setMtime(time); value.mtime = time;
value.setData(data); value.setData(data);
}, },
request.version}); request.version});

View File

@ -38,17 +38,13 @@ public:
int64_t pzxid{0}; int64_t pzxid{0};
uint64_t acl_id = 0; /// 0 -- no ACL by default uint64_t acl_id = 0; /// 0 -- no ACL by default
mutable struct int64_t mtime;
{
bool has_cached_digest : 1;
int64_t ctime : 63;
} has_cached_digest_and_ctime{false, 0};
struct struct
{ {
bool is_ephemeral : 1; bool is_ephemeral : 1;
int64_t mtime : 63; int64_t ctime : 63;
} is_ephemeral_and_mtime{false, 0}; } is_ephemeral_and_ctime{false, 0};
union union
{ {
@ -60,19 +56,15 @@ public:
} children_info; } children_info;
} ephemeral_or_children_data{0}; } ephemeral_or_children_data{0};
char * data{nullptr}; std::unique_ptr<char[]> data{nullptr};
uint32_t data_size{0}; uint32_t data_size{0};
int32_t version{0}; int32_t version{0};
int32_t cversion{0}; int32_t cversion{0};
int32_t aversion{0}; int32_t aversion{0};
/// we cannot use `std::optional<uint64_t> because we want to
/// pack the boolean with seq_num above
mutable uint64_t cached_digest = 0; mutable uint64_t cached_digest = 0;
~Node();
Node() = default; Node() = default;
Node & operator=(const Node & other); Node & operator=(const Node & other);
@ -83,7 +75,7 @@ public:
bool isEphemeral() const bool isEphemeral() const
{ {
return is_ephemeral_and_mtime.is_ephemeral; return is_ephemeral_and_ctime.is_ephemeral;
} }
int64_t ephemeralOwner() const int64_t ephemeralOwner() const
@ -96,7 +88,7 @@ public:
void setEphemeralOwner(int64_t ephemeral_owner) void setEphemeralOwner(int64_t ephemeral_owner)
{ {
is_ephemeral_and_mtime.is_ephemeral = ephemeral_owner != 0; is_ephemeral_and_ctime.is_ephemeral = ephemeral_owner != 0;
ephemeral_or_children_data.ephemeral_owner = ephemeral_owner; ephemeral_or_children_data.ephemeral_owner = ephemeral_owner;
} }
@ -146,22 +138,12 @@ public:
int64_t ctime() const int64_t ctime() const
{ {
return has_cached_digest_and_ctime.ctime; return is_ephemeral_and_ctime.ctime;
} }
void setCtime(uint64_t ctime) void setCtime(uint64_t ctime)
{ {
has_cached_digest_and_ctime.ctime = ctime; is_ephemeral_and_ctime.ctime = ctime;
}
int64_t mtime() const
{
return is_ephemeral_and_mtime.mtime;
}
void setMtime(uint64_t mtime)
{
is_ephemeral_and_mtime.mtime = mtime;
} }
void copyStats(const Coordination::Stat & stat); void copyStats(const Coordination::Stat & stat);
@ -173,7 +155,7 @@ public:
void setData(const String & new_data); void setData(const String & new_data);
StringRef getData() const noexcept { return {data, data_size}; } StringRef getData() const noexcept { return {data.get(), data_size}; }
void addChild(StringRef child_path); void addChild(StringRef child_path);
@ -205,10 +187,11 @@ public:
NO_DIGEST = 0, NO_DIGEST = 0,
V1 = 1, V1 = 1,
V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid V2 = 2, // added system nodes that modify the digest on startup so digest from V0 is invalid
V3 = 3 // fixed bug with casting, removed duplicate czxid usage V3 = 3, // fixed bug with casting, removed duplicate czxid usage
V4 = 4 // 0 is not a valid digest value
}; };
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V3; static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V4;
struct ResponseForSession struct ResponseForSession
{ {

View File

@ -114,9 +114,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, LoggerP
int64_t ctime; int64_t ctime;
Coordination::read(ctime, in); Coordination::read(ctime, in);
node.setCtime(ctime); node.setCtime(ctime);
int64_t mtime; Coordination::read(node.mtime, in);
Coordination::read(mtime, in);
node.setMtime(mtime);
Coordination::read(node.version, in); Coordination::read(node.version, in);
Coordination::read(node.cversion, in); Coordination::read(node.cversion, in);
Coordination::read(node.aversion, in); Coordination::read(node.aversion, in);