mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #59002 from ClickHouse/keeper-reduce-memory
Reduce Keeper memory usage
This commit is contained in:
commit
34463fd5e5
@ -74,7 +74,10 @@ namespace
|
||||
|
||||
/// Serialize ACL
|
||||
writeBinary(node.acl_id, out);
|
||||
writeBinary(node.is_sequental, out);
|
||||
/// Write is_sequential for backwards compatibility
|
||||
if (version < SnapshotVersion::V6)
|
||||
writeBinary(false, out);
|
||||
|
||||
/// Serialize stat
|
||||
writeBinary(node.stat.czxid, out);
|
||||
writeBinary(node.stat.mzxid, out);
|
||||
@ -84,16 +87,15 @@ namespace
|
||||
writeBinary(node.stat.cversion, out);
|
||||
writeBinary(node.stat.aversion, out);
|
||||
writeBinary(node.stat.ephemeralOwner, out);
|
||||
writeBinary(node.stat.dataLength, out);
|
||||
if (version < SnapshotVersion::V6)
|
||||
writeBinary(static_cast<int32_t>(node.getData().size()), out);
|
||||
writeBinary(node.stat.numChildren, out);
|
||||
writeBinary(node.stat.pzxid, out);
|
||||
|
||||
writeBinary(node.seq_num, out);
|
||||
|
||||
if (version >= SnapshotVersion::V4)
|
||||
{
|
||||
writeBinary(node.size_bytes, out);
|
||||
}
|
||||
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
|
||||
writeBinary(node.sizeInBytes(), out);
|
||||
}
|
||||
|
||||
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
|
||||
@ -129,7 +131,11 @@ namespace
|
||||
|
||||
acl_map.addUsage(node.acl_id);
|
||||
|
||||
readBinary(node.is_sequental, in);
|
||||
if (version < SnapshotVersion::V6)
|
||||
{
|
||||
bool is_sequential = false;
|
||||
readBinary(is_sequential, in);
|
||||
}
|
||||
|
||||
/// Deserialize stat
|
||||
readBinary(node.stat.czxid, in);
|
||||
@ -140,14 +146,19 @@ namespace
|
||||
readBinary(node.stat.cversion, in);
|
||||
readBinary(node.stat.aversion, in);
|
||||
readBinary(node.stat.ephemeralOwner, in);
|
||||
readBinary(node.stat.dataLength, in);
|
||||
if (version < SnapshotVersion::V6)
|
||||
{
|
||||
int32_t data_length = 0;
|
||||
readBinary(data_length, in);
|
||||
}
|
||||
readBinary(node.stat.numChildren, in);
|
||||
readBinary(node.stat.pzxid, in);
|
||||
readBinary(node.seq_num, in);
|
||||
|
||||
if (version >= SnapshotVersion::V4)
|
||||
if (version >= SnapshotVersion::V4 && version <= SnapshotVersion::V5)
|
||||
{
|
||||
readBinary(node.size_bytes, in);
|
||||
uint64_t size_bytes = 0;
|
||||
readBinary(size_bytes, in);
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,7 +365,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
|
||||
const auto is_node_empty = [](const auto & node)
|
||||
{
|
||||
return node.getData().empty() && node.stat == Coordination::Stat{};
|
||||
return node.getData().empty() && node.stat == KeeperStorage::Node::Stat{};
|
||||
};
|
||||
|
||||
for (size_t nodes_read = 0; nodes_read < snapshot_container_size; ++nodes_read)
|
||||
@ -398,9 +409,6 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
|
||||
error_msg);
|
||||
}
|
||||
|
||||
// we always ignore the written size for this node
|
||||
node.recalculateSize();
|
||||
}
|
||||
|
||||
storage.container.insertOrReplace(path, node);
|
||||
@ -417,7 +425,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
|
||||
{
|
||||
auto parent_path = parentNodePath(itr.key);
|
||||
storage.container.updateValue(
|
||||
parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); });
|
||||
parent_path, [path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path)); });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,9 +24,10 @@ enum SnapshotVersion : uint8_t
|
||||
V3 = 3, /// compress snapshots with ZSTD codec
|
||||
V4 = 4, /// add Node size to snapshots
|
||||
V5 = 5, /// add ZXID and digest to snapshots
|
||||
V6 = 6, /// remove is_sequential, per node size, data length
|
||||
};
|
||||
|
||||
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5;
|
||||
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V6;
|
||||
|
||||
/// What is stored in binary snapshot
|
||||
struct SnapshotDeserializationResult
|
||||
|
@ -24,7 +24,6 @@
|
||||
#include <Coordination/KeeperStorage.h>
|
||||
#include <Coordination/KeeperDispatcher.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <base/defines.h>
|
||||
#include <filesystem>
|
||||
@ -167,7 +166,7 @@ KeeperStorage::ResponsesForSessions processWatchesImpl(
|
||||
}
|
||||
|
||||
// When this function is updated, update CURRENT_DIGEST_VERSION!!
|
||||
uint64_t calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat)
|
||||
uint64_t calculateDigest(std::string_view path, std::string_view data, const KeeperStorage::Node::Stat & stat)
|
||||
{
|
||||
SipHash hash;
|
||||
|
||||
@ -184,7 +183,7 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo
|
||||
hash.update(stat.cversion);
|
||||
hash.update(stat.aversion);
|
||||
hash.update(stat.ephemeralOwner);
|
||||
hash.update(stat.dataLength);
|
||||
hash.update(data.length());
|
||||
hash.update(stat.numChildren);
|
||||
hash.update(stat.pzxid);
|
||||
|
||||
@ -193,36 +192,56 @@ uint64_t calculateDigest(std::string_view path, std::string_view data, const Coo
|
||||
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::setResponseStat(Coordination::Stat & response_stat) const
|
||||
{
|
||||
response_stat.czxid = stat.czxid;
|
||||
response_stat.mzxid = stat.mzxid;
|
||||
response_stat.ctime = stat.ctime;
|
||||
response_stat.mtime = stat.mtime;
|
||||
response_stat.version = stat.version;
|
||||
response_stat.cversion = stat.cversion;
|
||||
response_stat.aversion = stat.aversion;
|
||||
response_stat.ephemeralOwner = stat.ephemeralOwner;
|
||||
response_stat.dataLength = static_cast<int32_t>(data.size());
|
||||
response_stat.numChildren = stat.numChildren;
|
||||
response_stat.pzxid = stat.pzxid;
|
||||
|
||||
}
|
||||
|
||||
uint64_t KeeperStorage::Node::sizeInBytes() const
|
||||
{
|
||||
return sizeof(Node) + children.size() * sizeof(StringRef) + data.size();
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::setData(String new_data)
|
||||
{
|
||||
size_bytes = size_bytes - data.size() + new_data.size();
|
||||
data = std::move(new_data);
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::addChild(StringRef child_path, bool update_size)
|
||||
void KeeperStorage::Node::addChild(StringRef child_path)
|
||||
{
|
||||
if (update_size) [[likely]]
|
||||
size_bytes += sizeof child_path;
|
||||
children.insert(child_path);
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::removeChild(StringRef child_path)
|
||||
{
|
||||
size_bytes -= sizeof child_path;
|
||||
children.erase(child_path);
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::invalidateDigestCache() const
|
||||
{
|
||||
cached_digest.reset();
|
||||
has_cached_digest = false;
|
||||
}
|
||||
|
||||
UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const
|
||||
{
|
||||
if (!cached_digest)
|
||||
if (!has_cached_digest)
|
||||
{
|
||||
cached_digest = calculateDigest(path, data, stat);
|
||||
has_cached_digest = true;
|
||||
}
|
||||
|
||||
return *cached_digest;
|
||||
return cached_digest;
|
||||
};
|
||||
|
||||
void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
|
||||
@ -233,13 +252,6 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
|
||||
cached_digest = other.cached_digest;
|
||||
}
|
||||
|
||||
void KeeperStorage::Node::recalculateSize()
|
||||
{
|
||||
size_bytes = sizeof(Node);
|
||||
size_bytes += children.size() * sizeof(decltype(children)::value_type);
|
||||
size_bytes += data.size();
|
||||
}
|
||||
|
||||
KeeperStorage::KeeperStorage(
|
||||
int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, const bool initialize_system_nodes)
|
||||
: session_expiry_queue(tick_time_ms), keeper_context(keeper_context_), superdigest(superdigest_)
|
||||
@ -650,7 +662,6 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
|
||||
path,
|
||||
std::move(operation.data),
|
||||
operation.stat,
|
||||
operation.is_sequental,
|
||||
std::move(operation.acls)))
|
||||
onStorageInconsistency();
|
||||
|
||||
@ -729,8 +740,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
|
||||
bool KeeperStorage::createNode(
|
||||
const std::string & path,
|
||||
String data,
|
||||
const Coordination::Stat & stat,
|
||||
bool is_sequental,
|
||||
const KeeperStorage::Node::Stat & stat,
|
||||
Coordination::ACLs node_acls)
|
||||
{
|
||||
auto parent_path = parentNodePath(path);
|
||||
@ -753,7 +763,6 @@ bool KeeperStorage::createNode(
|
||||
created_node.acl_id = acl_id;
|
||||
created_node.stat = stat;
|
||||
created_node.setData(std::move(data));
|
||||
created_node.is_sequental = is_sequental;
|
||||
auto [map_key, _] = container.insert(path, created_node);
|
||||
/// Take child path from key owned by map.
|
||||
auto child_path = getBaseNodeName(map_key->getKey());
|
||||
@ -1012,7 +1021,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
|
||||
|
||||
new_deltas.emplace_back(std::string{parent_path}, zxid, KeeperStorage::UpdateNodeDelta{std::move(parent_update)});
|
||||
|
||||
Coordination::Stat stat;
|
||||
KeeperStorage::Node::Stat stat;
|
||||
stat.czxid = zxid;
|
||||
stat.mzxid = zxid;
|
||||
stat.pzxid = zxid;
|
||||
@ -1022,13 +1031,12 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
|
||||
stat.version = 0;
|
||||
stat.aversion = 0;
|
||||
stat.cversion = 0;
|
||||
stat.dataLength = static_cast<Int32>(request.data.length());
|
||||
stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
|
||||
|
||||
new_deltas.emplace_back(
|
||||
std::move(path_created),
|
||||
zxid,
|
||||
KeeperStorage::CreateNodeDelta{stat, request.is_sequential, std::move(node_acls), request.data});
|
||||
KeeperStorage::CreateNodeDelta{stat, std::move(node_acls), request.data});
|
||||
|
||||
digest = storage.calculateNodesDigest(digest, new_deltas);
|
||||
return new_deltas;
|
||||
@ -1126,7 +1134,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
|
||||
}
|
||||
else
|
||||
{
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.data = node_it->value.getData();
|
||||
response.error = Coordination::Error::ZOK;
|
||||
}
|
||||
@ -1285,7 +1293,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
|
||||
}
|
||||
else
|
||||
{
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.error = Coordination::Error::ZOK;
|
||||
}
|
||||
|
||||
@ -1345,7 +1353,6 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
|
||||
value.stat.version++;
|
||||
value.stat.mzxid = zxid;
|
||||
value.stat.mtime = time;
|
||||
value.stat.dataLength = static_cast<Int32>(data.length());
|
||||
value.setData(data);
|
||||
},
|
||||
request.version});
|
||||
@ -1384,7 +1391,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
|
||||
if (node_it == container.end())
|
||||
onStorageInconsistency();
|
||||
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.error = Coordination::Error::ZOK;
|
||||
|
||||
return response_ptr;
|
||||
@ -1481,7 +1488,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
|
||||
response.names.push_back(child.toString());
|
||||
}
|
||||
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.error = Coordination::Error::ZOK;
|
||||
}
|
||||
|
||||
@ -1675,7 +1682,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
|
||||
auto node_it = storage.container.find(request.path);
|
||||
if (node_it == storage.container.end())
|
||||
onStorageInconsistency();
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.error = Coordination::Error::ZOK;
|
||||
|
||||
return response_ptr;
|
||||
@ -1729,7 +1736,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
|
||||
}
|
||||
else
|
||||
{
|
||||
response.stat = node_it->value.stat;
|
||||
node_it->value.setResponseStat(response.stat);
|
||||
response.acl = storage.acl_map.convertNumber(node_it->value.acl_id);
|
||||
}
|
||||
|
||||
|
@ -30,24 +30,47 @@ struct KeeperStorageSnapshot;
|
||||
class KeeperStorage
|
||||
{
|
||||
public:
|
||||
/// Node should have as minimal size as possible to reduce memory footprint
|
||||
/// of stored nodes
|
||||
/// New fields should be added to the struct only if it's really necessary
|
||||
struct Node
|
||||
{
|
||||
uint64_t acl_id = 0; /// 0 -- no ACL by default
|
||||
bool is_sequental = false;
|
||||
Coordination::Stat stat{};
|
||||
int32_t seq_num = 0;
|
||||
uint64_t size_bytes; // save size to avoid calculate every time
|
||||
/// to reduce size of the Node struct we use a custom Stat without dataLength
|
||||
struct Stat
|
||||
{
|
||||
int64_t czxid{0};
|
||||
int64_t mzxid{0};
|
||||
int64_t ctime{0};
|
||||
int64_t mtime{0};
|
||||
int32_t version{0};
|
||||
int32_t cversion{0};
|
||||
int32_t aversion{0};
|
||||
int32_t numChildren{0}; /// NOLINT
|
||||
int64_t ephemeralOwner{0}; /// NOLINT
|
||||
int64_t pzxid{0};
|
||||
|
||||
Node() : size_bytes(sizeof(Node)) { }
|
||||
bool operator==(const Stat &) const = default;
|
||||
};
|
||||
|
||||
uint64_t acl_id = 0; /// 0 -- no ACL by default
|
||||
Stat stat{};
|
||||
int32_t seq_num = 0;
|
||||
|
||||
/// we cannot use `std::optional<uint64_t> because we want to
|
||||
/// pack the boolean with seq_num above
|
||||
mutable bool has_cached_digest = false;
|
||||
mutable uint64_t cached_digest = 0;
|
||||
|
||||
void setResponseStat(Coordination::Stat & response_stat) const;
|
||||
|
||||
/// Object memory size
|
||||
uint64_t sizeInBytes() const { return size_bytes; }
|
||||
uint64_t sizeInBytes() const;
|
||||
|
||||
void setData(String new_data);
|
||||
|
||||
const auto & getData() const noexcept { return data; }
|
||||
|
||||
void addChild(StringRef child_path, bool update_size = true);
|
||||
void addChild(StringRef child_path);
|
||||
|
||||
void removeChild(StringRef child_path);
|
||||
|
||||
@ -63,13 +86,9 @@ public:
|
||||
// copy only necessary information for preprocessing and digest calculation
|
||||
// (e.g. we don't need to copy list of children)
|
||||
void shallowCopy(const Node & other);
|
||||
|
||||
void recalculateSize();
|
||||
|
||||
private:
|
||||
String data;
|
||||
ChildrenSet children{};
|
||||
mutable std::optional<UInt64> cached_digest;
|
||||
};
|
||||
|
||||
enum DigestVersion : uint8_t
|
||||
@ -158,8 +177,7 @@ public:
|
||||
// - quickly commit the changes to the storage
|
||||
struct CreateNodeDelta
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
bool is_sequental;
|
||||
KeeperStorage::Node::Stat stat;
|
||||
Coordination::ACLs acls;
|
||||
String data;
|
||||
};
|
||||
@ -324,8 +342,7 @@ public:
|
||||
bool createNode(
|
||||
const std::string & path,
|
||||
String data,
|
||||
const Coordination::Stat & stat,
|
||||
bool is_sequental,
|
||||
const KeeperStorage::Node::Stat & stat,
|
||||
Coordination::ACLs node_acls);
|
||||
|
||||
// Remove node in the storage
|
||||
|
@ -1,32 +1,91 @@
|
||||
#pragma once
|
||||
#include <base/StringRef.h>
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/ArenaWithFreeLists.h>
|
||||
#include <Common/ArenaUtils.h>
|
||||
#include <unordered_map>
|
||||
#include <list>
|
||||
#include <atomic>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
template<typename V>
|
||||
struct ListNode
|
||||
{
|
||||
StringRef key;
|
||||
V value;
|
||||
|
||||
/// Monotonically increasing version info for snapshot
|
||||
size_t version{0};
|
||||
bool active_in_map{true};
|
||||
bool free_key{false};
|
||||
/// |* * ****** |
|
||||
/// ^ ^ ^
|
||||
/// active_in_map free_key version
|
||||
/// (1 byte) (1 byte) (6 bytes)
|
||||
uint64_t node_metadata = 0;
|
||||
|
||||
void setInactiveInMap()
|
||||
{
|
||||
node_metadata &= ~active_in_map_mask;
|
||||
}
|
||||
|
||||
void setActiveInMap()
|
||||
{
|
||||
node_metadata |= active_in_map_mask;
|
||||
}
|
||||
|
||||
bool isActiveInMap()
|
||||
{
|
||||
return node_metadata & active_in_map_mask;
|
||||
}
|
||||
|
||||
void setFreeKey()
|
||||
{
|
||||
node_metadata |= free_key_mask;
|
||||
}
|
||||
|
||||
bool getFreeKey()
|
||||
{
|
||||
return node_metadata & free_key_mask;
|
||||
}
|
||||
|
||||
uint64_t getVersion()
|
||||
{
|
||||
return node_metadata & version_mask;
|
||||
}
|
||||
|
||||
void setVersion(uint64_t version)
|
||||
{
|
||||
if (version > version_mask)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Snapshot version {} is larger than maximum allowed value {}", version, version_mask);
|
||||
|
||||
node_metadata &= ~version_mask;
|
||||
node_metadata |= version;
|
||||
}
|
||||
|
||||
static constexpr uint64_t active_in_map_mask = static_cast<uint64_t>(1) << 63;
|
||||
static constexpr uint64_t free_key_mask = static_cast<uint64_t>(1) << 62;
|
||||
static constexpr uint64_t version_mask = ~(static_cast<uint64_t>(3) << 62);
|
||||
};
|
||||
|
||||
template <class V>
|
||||
class SnapshotableHashTable
|
||||
{
|
||||
private:
|
||||
struct GlobalArena
|
||||
{
|
||||
char * alloc(const size_t size)
|
||||
{
|
||||
return new char[size];
|
||||
}
|
||||
|
||||
void free(const char * ptr, size_t /*size*/)
|
||||
{
|
||||
delete [] ptr;
|
||||
}
|
||||
};
|
||||
|
||||
using ListElem = ListNode<V>;
|
||||
using List = std::list<ListElem>;
|
||||
@ -39,7 +98,12 @@ private:
|
||||
/// Allows to avoid additional copies in updateValue function
|
||||
size_t current_version{0};
|
||||
size_t snapshot_up_to_version{0};
|
||||
ArenaWithFreeLists arena;
|
||||
|
||||
/// Arena used for keys
|
||||
/// we don't use std::string because it uses 24 bytes (because of SSO)
|
||||
/// we want to always allocate the key on heap and use StringRef to it
|
||||
GlobalArena arena;
|
||||
|
||||
/// Collect invalid iterators to avoid traversing the whole list
|
||||
std::vector<Mapped> snapshot_invalid_iters;
|
||||
|
||||
@ -125,6 +189,11 @@ public:
|
||||
using const_iterator = typename List::const_iterator;
|
||||
using ValueUpdater = std::function<void(V & value)>;
|
||||
|
||||
~SnapshotableHashTable()
|
||||
{
|
||||
clear();
|
||||
}
|
||||
|
||||
std::pair<typename IndexMap::LookupResult, bool> insert(const std::string & key, const V & value)
|
||||
{
|
||||
size_t hash_value = map.hash(key);
|
||||
@ -132,11 +201,13 @@ public:
|
||||
|
||||
if (!it)
|
||||
{
|
||||
ListElem elem{copyStringInArena(arena, key), value, current_version};
|
||||
ListElem elem{copyStringInArena(arena, key), value};
|
||||
elem.setVersion(current_version);
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
assert(inserted);
|
||||
itr->setActiveInMap();
|
||||
chassert(inserted);
|
||||
|
||||
it->getMapped() = itr;
|
||||
updateDataSize(INSERT, key.size(), value.sizeInBytes(), 0);
|
||||
@ -154,11 +225,13 @@ public:
|
||||
|
||||
if (it == map.end())
|
||||
{
|
||||
ListElem elem{copyStringInArena(arena, key), value, current_version};
|
||||
ListElem elem{copyStringInArena(arena, key), value};
|
||||
elem.setVersion(current_version);
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
assert(inserted);
|
||||
itr->setActiveInMap();
|
||||
chassert(inserted);
|
||||
it->getMapped() = itr;
|
||||
}
|
||||
else
|
||||
@ -166,8 +239,9 @@ public:
|
||||
auto list_itr = it->getMapped();
|
||||
if (snapshot_mode)
|
||||
{
|
||||
ListElem elem{list_itr->key, value, current_version};
|
||||
list_itr->active_in_map = false;
|
||||
ListElem elem{list_itr->key, value};
|
||||
elem.setVersion(current_version);
|
||||
list_itr->setInactiveInMap();
|
||||
auto new_list_itr = list.insert(list.end(), std::move(elem));
|
||||
it->getMapped() = new_list_itr;
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
@ -190,9 +264,9 @@ public:
|
||||
uint64_t old_data_size = list_itr->value.sizeInBytes();
|
||||
if (snapshot_mode)
|
||||
{
|
||||
list_itr->active_in_map = false;
|
||||
list_itr->setInactiveInMap();
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
list_itr->free_key = true;
|
||||
list_itr->setFreeKey();
|
||||
map.erase(it->getKey());
|
||||
}
|
||||
else
|
||||
@ -215,7 +289,7 @@ public:
|
||||
{
|
||||
size_t hash_value = map.hash(key);
|
||||
auto it = map.find(key, hash_value);
|
||||
assert(it != map.end());
|
||||
chassert(it != map.end());
|
||||
|
||||
auto list_itr = it->getMapped();
|
||||
uint64_t old_value_size = list_itr->value.sizeInBytes();
|
||||
@ -228,13 +302,14 @@ public:
|
||||
/// We in snapshot mode but updating some node which is already more
|
||||
/// fresh than snapshot distance. So it will not participate in
|
||||
/// snapshot and we don't need to copy it.
|
||||
if (list_itr->version <= snapshot_up_to_version)
|
||||
if (list_itr->getVersion() <= snapshot_up_to_version)
|
||||
{
|
||||
auto elem_copy = *(list_itr);
|
||||
list_itr->active_in_map = false;
|
||||
list_itr->setInactiveInMap();
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
updater(elem_copy.value);
|
||||
elem_copy.version = current_version;
|
||||
|
||||
elem_copy.setVersion(current_version);
|
||||
auto itr = list.insert(list.end(), std::move(elem_copy));
|
||||
it->getMapped() = itr;
|
||||
ret = itr;
|
||||
@ -269,17 +344,17 @@ public:
|
||||
const V & getValue(StringRef key) const
|
||||
{
|
||||
auto it = map.find(key);
|
||||
assert(it);
|
||||
chassert(it);
|
||||
return it->getMapped()->value;
|
||||
}
|
||||
|
||||
void clearOutdatedNodes()
|
||||
{
|
||||
for (auto & itr: snapshot_invalid_iters)
|
||||
for (auto & itr : snapshot_invalid_iters)
|
||||
{
|
||||
assert(!itr->active_in_map);
|
||||
chassert(!itr->isActiveInMap());
|
||||
updateDataSize(CLEAR_OUTDATED_NODES, itr->key.size, itr->value.sizeInBytes(), 0);
|
||||
if (itr->free_key)
|
||||
if (itr->getFreeKey())
|
||||
arena.free(const_cast<char *>(itr->key.data), itr->key.size);
|
||||
list.erase(itr);
|
||||
}
|
||||
@ -288,6 +363,7 @@ public:
|
||||
|
||||
void clear()
|
||||
{
|
||||
clearOutdatedNodes();
|
||||
map.clear();
|
||||
for (auto itr = list.begin(); itr != list.end(); ++itr)
|
||||
arena.free(const_cast<char *>(itr->key.data), itr->key.size);
|
||||
@ -327,13 +403,12 @@ public:
|
||||
approximate_data_size = 0;
|
||||
for (auto & node : list)
|
||||
{
|
||||
node.value.recalculateSize();
|
||||
approximate_data_size += node.key.size;
|
||||
approximate_data_size += node.value.sizeInBytes();
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t keyArenaSize() const { return arena.allocatedBytes(); }
|
||||
uint64_t keyArenaSize() const { return 0; }
|
||||
|
||||
iterator begin() { return list.begin(); }
|
||||
const_iterator begin() const { return list.cbegin(); }
|
||||
|
@ -120,7 +120,6 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
|
||||
Coordination::read(node.stat.pzxid, in);
|
||||
if (!path.empty())
|
||||
{
|
||||
node.stat.dataLength = static_cast<Int32>(node.getData().length());
|
||||
node.seq_num = node.stat.cversion;
|
||||
storage.container.insertOrReplace(path, node);
|
||||
|
||||
|
@ -45,10 +45,7 @@ struct ChangelogDirTest
|
||||
bool drop;
|
||||
explicit ChangelogDirTest(std::string path_, bool drop_ = true) : path(path_), drop(drop_)
|
||||
{
|
||||
if (fs::exists(path))
|
||||
{
|
||||
EXPECT_TRUE(false) << "Path " << path << " already exists, remove it to run test";
|
||||
}
|
||||
EXPECT_FALSE(fs::exists(path)) << "Path " << path << " already exists, remove it to run test";
|
||||
fs::create_directory(path);
|
||||
}
|
||||
|
||||
@ -1367,11 +1364,11 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
auto itr = map_snp.begin();
|
||||
EXPECT_EQ(itr->key, "/hello");
|
||||
EXPECT_EQ(itr->value, 7);
|
||||
EXPECT_EQ(itr->active_in_map, false);
|
||||
EXPECT_EQ(itr->isActiveInMap(), false);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr->key, "/hello");
|
||||
EXPECT_EQ(itr->value, 554);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr, map_snp.end());
|
||||
for (int i = 0; i < 5; ++i)
|
||||
@ -1387,7 +1384,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
{
|
||||
EXPECT_EQ(itr->key, "/hello" + std::to_string(i));
|
||||
EXPECT_EQ(itr->value, i);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
}
|
||||
|
||||
@ -1401,7 +1398,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
{
|
||||
EXPECT_EQ(itr->key, "/hello" + std::to_string(i));
|
||||
EXPECT_EQ(itr->value, i);
|
||||
EXPECT_EQ(itr->active_in_map, i != 3 && i != 2);
|
||||
EXPECT_EQ(itr->isActiveInMap(), i != 3 && i != 2);
|
||||
itr = std::next(itr);
|
||||
}
|
||||
map_snp.clearOutdatedNodes();
|
||||
@ -1411,19 +1408,19 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
itr = map_snp.begin();
|
||||
EXPECT_EQ(itr->key, "/hello");
|
||||
EXPECT_EQ(itr->value, 554);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr->key, "/hello0");
|
||||
EXPECT_EQ(itr->value, 0);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr->key, "/hello1");
|
||||
EXPECT_EQ(itr->value, 1);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr->key, "/hello4");
|
||||
EXPECT_EQ(itr->value, 4);
|
||||
EXPECT_EQ(itr->active_in_map, true);
|
||||
EXPECT_EQ(itr->isActiveInMap(), true);
|
||||
itr = std::next(itr);
|
||||
EXPECT_EQ(itr, map_snp.end());
|
||||
map_snp.disableSnapshotMode();
|
||||
|
@ -31,7 +31,7 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
|
||||
", czxid: " << value.stat.czxid <<
|
||||
", mzxid: " << value.stat.mzxid <<
|
||||
", numChildren: " << value.stat.numChildren <<
|
||||
", dataLength: " << value.stat.dataLength <<
|
||||
", dataLength: " << value.getData().size() <<
|
||||
"}" << std::endl;
|
||||
std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user