Merge pull request #35967 from ClickHouse/keeper-polishing

Small code changes in ZooKeeper client/Keeper code
This commit is contained in:
alesapin 2022-04-07 12:11:10 +02:00 committed by GitHub
commit d21ed546a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 147 additions and 136 deletions

View File

@ -846,7 +846,7 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason) void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason)
{ {
/// If some thread (send/receive) already finalizing session don't try to do it /// If some thread (send/receive) already finalizing session don't try to do it
bool already_started = finalization_started.exchange(true); bool already_started = finalization_started.test_and_set();
LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_finished={}, reason={}", LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_finished={}, reason={}",
session_id, already_started, requests_queue.isFinished(), reason); session_id, already_started, requests_queue.isFinished(), reason);

View File

@ -209,7 +209,7 @@ private:
std::atomic<XID> next_xid {1}; std::atomic<XID> next_xid {1};
/// Mark session finalization start. Used to avoid simultaneous /// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag. /// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false}; std::atomic_flag finalization_started;
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;

View File

@ -222,8 +222,8 @@ public:
} }
/// Check for duplicated changelog ids /// Check for duplicated changelog ids
if (logs.count(record.header.index) != 0) if (logs.contains(record.header.index))
std::erase_if(logs, [record] (const auto & item) { return item.first >= record.header.index; }); std::erase_if(logs, [&record] (const auto & item) { return item.first >= record.header.index; });
result.total_entries_read_from_log += 1; result.total_entries_read_from_log += 1;
@ -659,6 +659,7 @@ LogEntryPtr Changelog::getLatestConfigChange() const
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count) nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{ {
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs; std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
returned_logs.reserve(count);
uint64_t size_total = 0; uint64_t size_total = 0;
for (uint64_t i = index; i < index + count; ++i) for (uint64_t i = index; i < index + count; ++i)
@ -669,7 +670,7 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
nuraft::ptr<nuraft::buffer> buf = entry->second->serialize(); nuraft::ptr<nuraft::buffer> buf = entry->second->serialize();
size_total += buf->size(); size_total += buf->size();
returned_logs.push_back(buf); returned_logs.push_back(std::move(buf));
} }
nuraft::ptr<nuraft::buffer> buf_out = nuraft::buffer::alloc(sizeof(int32_t) + count * sizeof(int32_t) + size_total); nuraft::ptr<nuraft::buffer> buf_out = nuraft::buffer::alloc(sizeof(int32_t) + count * sizeof(int32_t) + size_total);
@ -678,9 +679,8 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
for (auto & entry : returned_logs) for (auto & entry : returned_logs)
{ {
nuraft::ptr<nuraft::buffer> & bb = entry; buf_out->put(static_cast<int32_t>(entry->size()));
buf_out->put(static_cast<int32_t>(bb->size())); buf_out->put(*entry);
buf_out->put(*bb);
} }
return buf_out; return buf_out;
} }
@ -699,7 +699,7 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
buffer.get(buf_local); buffer.get(buf_local);
LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local); LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local);
if (i == 0 && logs.count(cur_index)) if (i == 0 && logs.contains(cur_index))
writeAt(cur_index, log_entry); writeAt(cur_index, log_entry);
else else
appendEntry(cur_index, log_entry); appendEntry(cur_index, log_entry);

View File

@ -121,7 +121,7 @@ void KeeperDispatcher::requestThread()
current_batch.clear(); current_batch.clear();
} }
prev_batch = current_batch; prev_batch = std::move(current_batch);
prev_result = result; prev_result = result;
} }

View File

@ -43,7 +43,7 @@ namespace
void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out) void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out)
{ {
writeBinary(node.data, out); writeBinary(node.getData(), out);
/// Serialize ACL /// Serialize ACL
writeBinary(node.acl_id, out); writeBinary(node.acl_id, out);
@ -71,7 +71,9 @@ namespace
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{ {
readBinary(node.data, in); String new_data;
readBinary(new_data, in);
node.setData(std::move(new_data));
if (version >= SnapshotVersion::V1) if (version >= SnapshotVersion::V1)
{ {
@ -281,7 +283,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
if (itr.key != "/") if (itr.key != "/")
{ {
auto parent_path = parentPath(itr.key); auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); }); storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); });
} }
} }

View File

@ -259,22 +259,9 @@ void KeeperStateMachine::save_logical_snp_obj(
{ {
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
nuraft::ptr<nuraft::buffer> cloned_buffer;
nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx(), getClusterConfig());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
{
/// copy snapshot into memory
}
/// copy snapshot meta into memory /// copy snapshot meta into memory
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize(); nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
cloned_meta = nuraft::snapshot::deserialize(*snp_buf); nuraft::ptr<nuraft::snapshot> cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
try try
{ {
@ -332,15 +319,7 @@ int KeeperStateMachine::read_logical_snp_obj(
{ {
LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
data_out = nuraft::buffer::alloc(sizeof(int32_t));
nuraft::buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = false;
}
else
{
std::lock_guard lock(snapshots_lock); std::lock_guard lock(snapshots_lock);
/// Our snapshot is not equal to required. Maybe we still creating it in the background. /// Our snapshot is not equal to required. Maybe we still creating it in the background.
/// Let's wait and NuRaft will retry this call. /// Let's wait and NuRaft will retry this call.
@ -356,7 +335,6 @@ int KeeperStateMachine::read_logical_snp_obj(
return -1; return -1;
} }
is_last_obj = true; is_last_obj = true;
}
return 1; return 1;
} }

View File

@ -84,7 +84,7 @@ public:
bool shouldStartAsFollower() const bool shouldStartAsFollower() const
{ {
std::lock_guard lock(configuration_wrapper_mutex); std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.servers_start_as_followers.count(my_server_id); return configuration_wrapper.servers_start_as_followers.contains(my_server_id);
} }
bool isSecure() const bool isSecure() const

View File

@ -24,7 +24,10 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
static String base64Encode(const String & decoded) namespace
{
String base64Encode(const String & decoded)
{ {
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit); ostr.exceptions(std::ios::failbit);
@ -35,7 +38,7 @@ static String base64Encode(const String & decoded)
return ostr.str(); return ostr.str();
} }
static String getSHA1(const String & userdata) String getSHA1(const String & userdata)
{ {
Poco::SHA1Engine engine; Poco::SHA1Engine engine;
engine.update(userdata); engine.update(userdata);
@ -43,14 +46,14 @@ static String getSHA1(const String & userdata)
return String{digest_id.begin(), digest_id.end()}; return String{digest_id.begin(), digest_id.end()};
} }
static String generateDigest(const String & userdata) String generateDigest(const String & userdata)
{ {
std::vector<String> user_password; std::vector<String> user_password;
boost::split(user_password, userdata, [](char c) { return c == ':'; }); boost::split(user_password, userdata, [](char c) { return c == ':'; });
return user_password[0] + ":" + base64Encode(getSHA1(userdata)); return user_password[0] + ":" + base64Encode(getSHA1(userdata));
} }
static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths) bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths)
{ {
if (node_acls.empty()) if (node_acls.empty())
return true; return true;
@ -77,7 +80,7 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c
return false; return false;
} }
static bool fixupACL( bool fixupACL(
const std::vector<Coordination::ACL> & request_acls, const std::vector<Coordination::ACL> & request_acls,
const std::vector<KeeperStorage::AuthID> & current_ids, const std::vector<KeeperStorage::AuthID> & current_ids,
std::vector<Coordination::ACL> & result_acls) std::vector<Coordination::ACL> & result_acls)
@ -119,7 +122,7 @@ static bool fixupACL(
return valid_found; return valid_found;
} }
static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
{ {
KeeperStorage::ResponsesForSessions result; KeeperStorage::ResponsesForSessions result;
auto it = watches.find(path); auto it = watches.find(path);
@ -174,6 +177,25 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat
} }
return result; return result;
} }
}
void KeeperStorage::Node::setData(String new_data)
{
size_bytes = size_bytes - data.size() + new_data.size();
data = std::move(new_data);
}
void KeeperStorage::Node::addChild(StringRef child_path)
{
size_bytes += sizeof child_path;
children.insert(child_path);
}
void KeeperStorage::Node::removeChild(StringRef child_path)
{
size_bytes -= sizeof child_path;
children.erase(child_path);
}
KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_) KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_)
: session_expiry_queue(tick_time_ms) : session_expiry_queue(tick_time_ms)
@ -314,8 +336,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
created_node.stat.numChildren = 0; created_node.stat.numChildren = 0;
created_node.stat.dataLength = request.data.length(); created_node.stat.dataLength = request.data.length();
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
created_node.data = request.data;
created_node.is_sequental = request.is_sequential; created_node.is_sequental = request.is_sequential;
created_node.setData(std::move(request.data));
auto [map_key, _] = container.insert(path_created, created_node); auto [map_key, _] = container.insert(path_created, created_node);
/// Take child path from key owned by map. /// Take child path from key owned by map.
@ -327,8 +349,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid, container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid,
parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent) parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent)
{ {
parent.children.insert(child_path); parent.addChild(child_path);
parent.size_bytes += child_path.size;
prev_parent_cversion = parent.stat.cversion; prev_parent_cversion = parent.stat.cversion;
prev_parent_zxid = parent.stat.pzxid; prev_parent_zxid = parent.stat.pzxid;
@ -363,8 +384,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
--undo_parent.seq_num; --undo_parent.seq_num;
undo_parent.stat.cversion = prev_parent_cversion; undo_parent.stat.cversion = prev_parent_cversion;
undo_parent.stat.pzxid = prev_parent_zxid; undo_parent.stat.pzxid = prev_parent_zxid;
undo_parent.children.erase(child_path); undo_parent.removeChild(child_path);
undo_parent.size_bytes -= child_path.size;
}); });
storage.container.erase(path_created); storage.container.erase(path_created);
@ -409,7 +429,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
else else
{ {
response.stat = it->value.stat; response.stat = it->value.stat;
response.data = it->value.data; response.data = it->value.getData();
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
} }
@ -498,8 +518,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{ {
--parent.stat.numChildren; --parent.stat.numChildren;
++parent.stat.cversion; ++parent.stat.cversion;
parent.children.erase(child_basename); parent.removeChild(child_basename);
parent.size_bytes -= child_basename.size;
}); });
response.error = Coordination::Error::ZOK; response.error = Coordination::Error::ZOK;
@ -520,8 +539,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{ {
++parent.stat.numChildren; ++parent.stat.numChildren;
--parent.stat.cversion; --parent.stat.cversion;
parent.children.insert(child_name); parent.addChild(child_name);
parent.size_bytes += child_name.size;
}); });
}; };
} }
@ -598,14 +616,13 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
auto prev_node = it->value; auto prev_node = it->value;
auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) mutable
{ {
value.stat.version++; value.stat.version++;
value.stat.mzxid = zxid; value.stat.mzxid = zxid;
value.stat.mtime = time; value.stat.mtime = time;
value.stat.dataLength = request.data.length(); value.stat.dataLength = request.data.length();
value.size_bytes = value.size_bytes + request.data.size() - value.data.size(); value.setData(std::move(request.data));
value.data = request.data;
}); });
container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent) container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent)
@ -675,9 +692,10 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
if (path_prefix.empty()) if (path_prefix.empty())
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
response.names.reserve(it->value.children.size()); const auto & children = it->value.getChildren();
response.names.reserve(children.size());
for (const auto child : it->value.children) for (const auto child : children)
response.names.push_back(child.toString()); response.names.push_back(child.toString());
response.stat = it->value.stat; response.stat = it->value.stat;
@ -856,26 +874,25 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (const auto & sub_request : request.requests) for (const auto & sub_request : request.requests)
{ {
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request); auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create) switch (sub_zk_request->getOpNum())
{ {
case Coordination::OpNum::Create:
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request)); concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
} break;
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove) case Coordination::OpNum::Remove:
{
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request)); concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
} break;
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set) case Coordination::OpNum::Set:
{
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request)); concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
} break;
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check) case Coordination::OpNum::Check:
{
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request)); concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
} break;
else default:
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
} }
} }
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{ {
@ -1092,8 +1109,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
--parent.stat.numChildren; --parent.stat.numChildren;
++parent.stat.cversion; ++parent.stat.cversion;
auto base_name = getBaseName(ephemeral_path); auto base_name = getBaseName(ephemeral_path);
parent.children.erase(base_name); parent.removeChild(base_name);
parent.size_bytes -= base_name.size;
}); });
container.erase(ephemeral_path); container.erase(ephemeral_path);

View File

@ -32,28 +32,38 @@ public:
struct Node struct Node
{ {
String data;
uint64_t acl_id = 0; /// 0 -- no ACL by default uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false; bool is_sequental = false;
Coordination::Stat stat{}; Coordination::Stat stat{};
int32_t seq_num = 0; int32_t seq_num = 0;
ChildrenSet children{};
uint64_t size_bytes; // save size to avoid calculate every time uint64_t size_bytes; // save size to avoid calculate every time
Node() Node() : size_bytes(sizeof(Node)) { }
{
size_bytes = sizeof(size_bytes);
size_bytes += data.size();
size_bytes += sizeof(acl_id);
size_bytes += sizeof(is_sequental);
size_bytes += sizeof(stat);
size_bytes += sizeof(seq_num);
}
/// Object memory size /// Object memory size
uint64_t sizeInBytes() const uint64_t sizeInBytes() const
{ {
return size_bytes; return size_bytes;
} }
void setData(String new_data);
const auto & getData() const noexcept
{
return data;
}
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept
{
return children;
}
private:
String data;
ChildrenSet children{};
}; };
struct ResponseForSession struct ResponseForSession
@ -104,7 +114,7 @@ public:
/// Mapping session_id -> set of ephemeral nodes paths /// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals; Ephemerals ephemerals;
/// Mapping sessuib_id -> set of watched nodes paths /// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers; SessionAndWatcher sessions_and_watchers;
/// Expiration queue for session, allows to get dead sessions at some point of time /// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue; SessionExpiryQueue session_expiry_queue;

View File

@ -80,7 +80,7 @@ private:
approximate_data_size += value_size; approximate_data_size += value_size;
if (!snapshot_mode) if (!snapshot_mode)
{ {
approximate_data_size += key_size; approximate_data_size -= key_size;
approximate_data_size -= old_value_size; approximate_data_size -= old_value_size;
} }
} }
@ -132,7 +132,6 @@ public:
if (!it) if (!it)
{ {
ListElem elem{copyStringInArena(arena, key), value, current_version}; ListElem elem{copyStringInArena(arena, key), value, current_version};
auto itr = list.insert(list.end(), std::move(elem)); auto itr = list.insert(list.end(), std::move(elem));
bool inserted; bool inserted;
@ -228,7 +227,7 @@ public:
/// We in snapshot mode but updating some node which is already more /// We in snapshot mode but updating some node which is already more
/// fresh than snapshot distance. So it will not participate in /// fresh than snapshot distance. So it will not participate in
/// snapshot and we don't need to copy it. /// snapshot and we don't need to copy it.
if (snapshot_mode && list_itr->version <= snapshot_up_to_version) if (list_itr->version <= snapshot_up_to_version)
{ {
auto elem_copy = *(list_itr); auto elem_copy = *(list_itr);
list_itr->active_in_map = false; list_itr->active_in_map = false;

View File

@ -98,7 +98,9 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
while (path != "/") while (path != "/")
{ {
KeeperStorage::Node node{}; KeeperStorage::Node node{};
Coordination::read(node.data, in); String data;
Coordination::read(data, in);
node.setData(std::move(data));
Coordination::read(node.acl_id, in); Coordination::read(node.acl_id, in);
/// Deserialize stat /// Deserialize stat
@ -117,7 +119,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
Coordination::read(node.stat.pzxid, in); Coordination::read(node.stat.pzxid, in);
if (!path.empty()) if (!path.empty())
{ {
node.stat.dataLength = node.data.length(); node.stat.dataLength = node.getData().length();
node.seq_num = node.stat.cversion; node.seq_num = node.stat.cversion;
storage.container.insertOrReplace(path, node); storage.container.insertOrReplace(path, node);
@ -137,7 +139,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
if (itr.key != "/") if (itr.key != "/")
{ {
auto parent_path = parentPath(itr.key); auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; }); storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; });
} }
} }

View File

@ -946,6 +946,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.updateValue("hello", [](IntNode & value) { value = 2; }); hello.updateValue("hello", [](IntNode & value) { value = 2; });
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.insertOrReplace("hello", 3);
EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.erase("hello"); hello.erase("hello");
EXPECT_EQ(hello.getApproximateDataSize(), 0); EXPECT_EQ(hello.getApproximateDataSize(), 0);
@ -958,6 +960,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.updateValue("hello", [](IntNode & value) { value = 2; }); hello.updateValue("hello", [](IntNode & value) { value = 2; });
EXPECT_EQ(hello.getApproximateDataSize(), 18); EXPECT_EQ(hello.getApproximateDataSize(), 18);
hello.insertOrReplace("hello", 1);
EXPECT_EQ(hello.getApproximateDataSize(), 27);
hello.clearOutdatedNodes(); hello.clearOutdatedNodes();
EXPECT_EQ(hello.getApproximateDataSize(), 9); EXPECT_EQ(hello.getApproximateDataSize(), 9);
@ -972,31 +976,31 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
using Node = DB::KeeperStorage::Node; using Node = DB::KeeperStorage::Node;
DB::SnapshotableHashTable<Node> world; DB::SnapshotableHashTable<Node> world;
Node n1; Node n1;
n1.data = "1234"; n1.setData("1234");
Node n2; Node n2;
n2.data = "123456"; n2.setData("123456");
n2.children.insert(""); n2.addChild("");
world.disableSnapshotMode(); world.disableSnapshotMode();
world.insert("world", n1); world.insert("world", n1);
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 177);
world.updateValue("world", [&](Node & value) { value = n2; }); world.updateValue("world", [&](Node & value) { value = n2; });
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 195);
world.erase("world"); world.erase("world");
EXPECT_EQ(world.getApproximateDataSize(), 0); EXPECT_EQ(world.getApproximateDataSize(), 0);
world.enableSnapshotMode(100000); world.enableSnapshotMode(100000);
world.insert("world", n1); world.insert("world", n1);
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 177);
world.updateValue("world", [&](Node & value) { value = n2; }); world.updateValue("world", [&](Node & value) { value = n2; });
EXPECT_EQ(world.getApproximateDataSize(), 196); EXPECT_EQ(world.getApproximateDataSize(), 372);
world.clearOutdatedNodes(); world.clearOutdatedNodes();
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 195);
world.erase("world"); world.erase("world");
EXPECT_EQ(world.getApproximateDataSize(), 98); EXPECT_EQ(world.getApproximateDataSize(), 195);
world.clear(); world.clear();
EXPECT_EQ(world.getApproximateDataSize(), 0); EXPECT_EQ(world.getApproximateDataSize(), 0);
@ -1006,7 +1010,7 @@ void addNode(DB::KeeperStorage & storage, const std::string & path, const std::s
{ {
using Node = DB::KeeperStorage::Node; using Node = DB::KeeperStorage::Node;
Node node{}; Node node{};
node.data = data; node.setData(data);
node.stat.ephemeralOwner = ephemeral_owner; node.stat.ephemeralOwner = ephemeral_owner;
storage.container.insertOrReplace(path, node); storage.container.insertOrReplace(path, node);
} }
@ -1044,13 +1048,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
auto [restored_storage, snapshot_meta, _] = manager.deserializeSnapshotFromBuffer(debuf); auto [restored_storage, snapshot_meta, _] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 3); EXPECT_EQ(restored_storage->container.size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1); EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello").children.size(), 1); EXPECT_EQ(restored_storage->container.getValue("/hello").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").children.size(), 0); EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").data, ""); EXPECT_EQ(restored_storage->container.getValue("/").getData(), "");
EXPECT_EQ(restored_storage->container.getValue("/hello").data, "world"); EXPECT_EQ(restored_storage->container.getValue("/hello").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").data, "somedata"); EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7); EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2); EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2); EXPECT_EQ(restored_storage->ephemerals.size(), 2);
@ -1095,7 +1099,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites)
EXPECT_EQ(restored_storage->container.size(), 51); EXPECT_EQ(restored_storage->container.size(), 51);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i)); EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).getData(), "world_" + std::to_string(i));
} }
} }
@ -1135,7 +1139,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots)
for (size_t i = 0; i < 250; ++i) for (size_t i = 0; i < 250; ++i)
{ {
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i)); EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).getData(), "world_" + std::to_string(i));
} }
} }
@ -1158,7 +1162,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
} }
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
EXPECT_EQ(storage.container.getValue("/hello_" + std::to_string(i)).data, "wlrd_" + std::to_string(i)); EXPECT_EQ(storage.container.getValue("/hello_" + std::to_string(i)).getData(), "wlrd_" + std::to_string(i));
} }
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
@ -1178,7 +1182,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
if (i % 2 != 0) if (i % 2 != 0)
EXPECT_EQ(storage.container.getValue("/hello_" + std::to_string(i)).data, "wlrd_" + std::to_string(i)); EXPECT_EQ(storage.container.getValue("/hello_" + std::to_string(i)).getData(), "wlrd_" + std::to_string(i));
else else
EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i))); EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i)));
} }
@ -1187,7 +1191,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i)); EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).getData(), "world_" + std::to_string(i));
} }
} }
@ -1310,7 +1314,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
for (size_t i = 1; i < total_logs + 1; ++i) for (size_t i = 1; i < total_logs + 1; ++i)
{ {
auto path = "/hello_" + std::to_string(i); auto path = "/hello_" + std::to_string(i);
EXPECT_EQ(source_storage.container.getValue(path).data, restored_storage.container.getValue(path).data); EXPECT_EQ(source_storage.container.getValue(path).getData(), restored_storage.container.getValue(path).getData());
} }
} }
@ -1585,13 +1589,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
auto [restored_storage, snapshot_meta, _] = new_manager.deserializeSnapshotFromBuffer(debuf); auto [restored_storage, snapshot_meta, _] = new_manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage->container.size(), 3); EXPECT_EQ(restored_storage->container.size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1); EXPECT_EQ(restored_storage->container.getValue("/").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello").children.size(), 1); EXPECT_EQ(restored_storage->container.getValue("/hello").getChildren().size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").children.size(), 0); EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getChildren().size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").data, ""); EXPECT_EQ(restored_storage->container.getValue("/").getData(), "");
EXPECT_EQ(restored_storage->container.getValue("/hello").data, "world"); EXPECT_EQ(restored_storage->container.getValue("/hello").getData(), "world");
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").data, "somedata"); EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").getData(), "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7); EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2); EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2); EXPECT_EQ(restored_storage->ephemerals.size(), 2);

View File

@ -32,9 +32,9 @@ void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
", numChildren: " << value.stat.numChildren << ", numChildren: " << value.stat.numChildren <<
", dataLength: " << value.stat.dataLength << ", dataLength: " << value.stat.dataLength <<
"}" << std::endl; "}" << std::endl;
std::cout << "\tData: " << storage.container.getValue(key).data << std::endl; std::cout << "\tData: " << storage.container.getValue(key).getData() << std::endl;
for (const auto & child : value.children) for (const auto & child : value.getChildren())
{ {
if (key == "/") if (key == "/")
keys.push(key + child.toString()); keys.push(key + child.toString());