From 77dbe3fee8d4b784317da39ec2aed295f1bb6f16 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 4 Mar 2021 16:02:30 +0300 Subject: [PATCH] Get rid of network order --- src/Coordination/NuKeeperSnapshotManager.cpp | 106 +++++++++++++------ 1 file changed, 74 insertions(+), 32 deletions(-) diff --git a/src/Coordination/NuKeeperSnapshotManager.cpp b/src/Coordination/NuKeeperSnapshotManager.cpp index 211e5a14540..f668bb3342d 100644 --- a/src/Coordination/NuKeeperSnapshotManager.cpp +++ b/src/Coordination/NuKeeperSnapshotManager.cpp @@ -53,37 +53,79 @@ namespace void writeNode(const NuKeeperStorage::Node & node, WriteBuffer & out) { - /// FIXME why we store them in network order? - Coordination::write(node.data, out); - Coordination::write(node.acls, out); - Coordination::write(node.is_sequental, out); - Coordination::write(node.stat, out); - Coordination::write(node.seq_num, out); + writeBinary(node.data, out); + + /// Serialize ACL + writeBinary(node.acls.size(), out); + for (const auto & acl : node.acls) + { + writeBinary(acl.permissions, out); + writeBinary(acl.scheme, out); + writeBinary(acl.id, out); + } + + writeBinary(node.is_sequental, out); + /// Serialize stat + writeBinary(node.stat.czxid, out); + writeBinary(node.stat.mzxid, out); + writeBinary(node.stat.ctime, out); + writeBinary(node.stat.mtime, out); + writeBinary(node.stat.version, out); + writeBinary(node.stat.cversion, out); + writeBinary(node.stat.aversion, out); + writeBinary(node.stat.ephemeralOwner, out); + writeBinary(node.stat.dataLength, out); + writeBinary(node.stat.numChildren, out); + writeBinary(node.stat.pzxid, out); + + writeBinary(node.seq_num, out); } void readNode(NuKeeperStorage::Node & node, ReadBuffer & in) { - /// FIXME why we store them in network order? - Coordination::read(node.data, in); - Coordination::read(node.acls, in); - Coordination::read(node.is_sequental, in); - Coordination::read(node.stat, in); - Coordination::read(node.seq_num, in); + readBinary(node.data, in); + + /// Deserialize ACL + size_t acls_size; + readBinary(acls_size, in); + for (size_t i = 0; i < acls_size; ++i) + { + Coordination::ACL acl; + readBinary(acl.permissions, in); + readBinary(acl.scheme, in); + readBinary(acl.id, in); + node.acls.push_back(acl); + } + readBinary(node.is_sequental, in); + + /// Deserialize stat + readBinary(node.stat.czxid, in); + readBinary(node.stat.mzxid, in); + readBinary(node.stat.ctime, in); + readBinary(node.stat.mtime, in); + readBinary(node.stat.version, in); + readBinary(node.stat.cversion, in); + readBinary(node.stat.aversion, in); + readBinary(node.stat.ephemeralOwner, in); + readBinary(node.stat.dataLength, in); + readBinary(node.stat.numChildren, in); + readBinary(node.stat.pzxid, in); + readBinary(node.seq_num, in); } void serializeSnapshotMetadata(const SnapshotMetadataPtr & snapshot_meta, WriteBuffer & out) { auto buffer = snapshot_meta->serialize(); - Coordination::write(reinterpret_cast(buffer->data_begin()), buffer->size(), out); + writeVarUInt(buffer->size(), out); + out.write(reinterpret_cast(buffer->data_begin()), buffer->size()); } SnapshotMetadataPtr deserializeSnapshotMetadata(ReadBuffer & in) { - /// FIXME double copy (alesap) - std::string data; - Coordination::read(data, in); - auto buffer = nuraft::buffer::alloc(data.size()); - buffer->put_raw(reinterpret_cast(data.c_str()), data.size()); + size_t data_size; + readVarUInt(data_size, in); + auto buffer = nuraft::buffer::alloc(data_size); + in.readStrict(reinterpret_cast(buffer->data_begin()), data_size); buffer->pos(0); return SnapshotMetadata::deserialize(*buffer); } @@ -92,10 +134,10 @@ namespace void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out) { - Coordination::write(static_cast(snapshot.version), out); + writeBinary(static_cast(snapshot.version), out); serializeSnapshotMetadata(snapshot.snapshot_meta, out); - Coordination::write(snapshot.session_id, out); - Coordination::write(snapshot.snapshot_container_size, out); + writeBinary(snapshot.session_id, out); + writeBinary(snapshot.snapshot_container_size, out); size_t counter = 0; for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++it, ++counter) { @@ -104,40 +146,40 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot if (static_cast(node.stat.mzxid) > snapshot.snapshot_meta->get_last_log_idx()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to serialize node with mzxid {}, but last snapshot index {}", node.stat.mzxid, snapshot.snapshot_meta->get_last_log_idx()); - Coordination::write(path, out); + writeBinary(path, out); writeNode(node, out); } size_t size = snapshot.session_and_timeout.size(); - Coordination::write(size, out); + writeBinary(size, out); for (const auto & [session_id, timeout] : snapshot.session_and_timeout) { - Coordination::write(session_id, out); - Coordination::write(timeout, out); + writeBinary(session_id, out); + writeBinary(timeout, out); } } SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in) { uint8_t version; - Coordination::read(version, in); + readBinary(version, in); if (static_cast(version) > SnapshotVersion::V0) throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version); SnapshotMetadataPtr result = deserializeSnapshotMetadata(in); int64_t session_id; - Coordination::read(session_id, in); + readBinary(session_id, in); storage.zxid = result->get_last_log_idx(); storage.session_id_counter = session_id; size_t snapshot_container_size; - Coordination::read(snapshot_container_size, in); + readBinary(snapshot_container_size, in); size_t current_size = 0; while (current_size < snapshot_container_size) { std::string path; - Coordination::read(path, in); + readBinary(path, in); NuKeeperStorage::Node node; readNode(node, in); storage.container.insertOrReplace(path, node); @@ -157,14 +199,14 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora } size_t active_sessions_size; - Coordination::read(active_sessions_size, in); + readBinary(active_sessions_size, in); size_t current_session_size = 0; while (current_session_size < active_sessions_size) { int64_t active_session_id, timeout; - Coordination::read(active_session_id, in); - Coordination::read(timeout, in); + readBinary(active_session_id, in); + readBinary(timeout, in); storage.addSessionID(active_session_id, timeout); current_session_size++; }