mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 21:20:49 +00:00
Get rid of network order
This commit is contained in:
parent
27011f086e
commit
77dbe3fee8
@ -53,37 +53,79 @@ namespace
|
||||
|
||||
void writeNode(const NuKeeperStorage::Node & node, WriteBuffer & out)
|
||||
{
|
||||
/// FIXME why we store them in network order?
|
||||
Coordination::write(node.data, out);
|
||||
Coordination::write(node.acls, out);
|
||||
Coordination::write(node.is_sequental, out);
|
||||
Coordination::write(node.stat, out);
|
||||
Coordination::write(node.seq_num, out);
|
||||
writeBinary(node.data, out);
|
||||
|
||||
/// Serialize ACL
|
||||
writeBinary(node.acls.size(), out);
|
||||
for (const auto & acl : node.acls)
|
||||
{
|
||||
writeBinary(acl.permissions, out);
|
||||
writeBinary(acl.scheme, out);
|
||||
writeBinary(acl.id, out);
|
||||
}
|
||||
|
||||
writeBinary(node.is_sequental, out);
|
||||
/// Serialize stat
|
||||
writeBinary(node.stat.czxid, out);
|
||||
writeBinary(node.stat.mzxid, out);
|
||||
writeBinary(node.stat.ctime, out);
|
||||
writeBinary(node.stat.mtime, out);
|
||||
writeBinary(node.stat.version, out);
|
||||
writeBinary(node.stat.cversion, out);
|
||||
writeBinary(node.stat.aversion, out);
|
||||
writeBinary(node.stat.ephemeralOwner, out);
|
||||
writeBinary(node.stat.dataLength, out);
|
||||
writeBinary(node.stat.numChildren, out);
|
||||
writeBinary(node.stat.pzxid, out);
|
||||
|
||||
writeBinary(node.seq_num, out);
|
||||
}
|
||||
|
||||
void readNode(NuKeeperStorage::Node & node, ReadBuffer & in)
|
||||
{
|
||||
/// FIXME why we store them in network order?
|
||||
Coordination::read(node.data, in);
|
||||
Coordination::read(node.acls, in);
|
||||
Coordination::read(node.is_sequental, in);
|
||||
Coordination::read(node.stat, in);
|
||||
Coordination::read(node.seq_num, in);
|
||||
readBinary(node.data, in);
|
||||
|
||||
/// Deserialize ACL
|
||||
size_t acls_size;
|
||||
readBinary(acls_size, in);
|
||||
for (size_t i = 0; i < acls_size; ++i)
|
||||
{
|
||||
Coordination::ACL acl;
|
||||
readBinary(acl.permissions, in);
|
||||
readBinary(acl.scheme, in);
|
||||
readBinary(acl.id, in);
|
||||
node.acls.push_back(acl);
|
||||
}
|
||||
readBinary(node.is_sequental, in);
|
||||
|
||||
/// Deserialize stat
|
||||
readBinary(node.stat.czxid, in);
|
||||
readBinary(node.stat.mzxid, in);
|
||||
readBinary(node.stat.ctime, in);
|
||||
readBinary(node.stat.mtime, in);
|
||||
readBinary(node.stat.version, in);
|
||||
readBinary(node.stat.cversion, in);
|
||||
readBinary(node.stat.aversion, in);
|
||||
readBinary(node.stat.ephemeralOwner, in);
|
||||
readBinary(node.stat.dataLength, in);
|
||||
readBinary(node.stat.numChildren, in);
|
||||
readBinary(node.stat.pzxid, in);
|
||||
readBinary(node.seq_num, in);
|
||||
}
|
||||
|
||||
void serializeSnapshotMetadata(const SnapshotMetadataPtr & snapshot_meta, WriteBuffer & out)
|
||||
{
|
||||
auto buffer = snapshot_meta->serialize();
|
||||
Coordination::write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size(), out);
|
||||
writeVarUInt(buffer->size(), out);
|
||||
out.write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size());
|
||||
}
|
||||
|
||||
SnapshotMetadataPtr deserializeSnapshotMetadata(ReadBuffer & in)
|
||||
{
|
||||
/// FIXME double copy (alesap)
|
||||
std::string data;
|
||||
Coordination::read(data, in);
|
||||
auto buffer = nuraft::buffer::alloc(data.size());
|
||||
buffer->put_raw(reinterpret_cast<const nuraft::byte *>(data.c_str()), data.size());
|
||||
size_t data_size;
|
||||
readVarUInt(data_size, in);
|
||||
auto buffer = nuraft::buffer::alloc(data_size);
|
||||
in.readStrict(reinterpret_cast<char *>(buffer->data_begin()), data_size);
|
||||
buffer->pos(0);
|
||||
return SnapshotMetadata::deserialize(*buffer);
|
||||
}
|
||||
@ -92,10 +134,10 @@ namespace
|
||||
|
||||
void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out)
|
||||
{
|
||||
Coordination::write(static_cast<uint8_t>(snapshot.version), out);
|
||||
writeBinary(static_cast<uint8_t>(snapshot.version), out);
|
||||
serializeSnapshotMetadata(snapshot.snapshot_meta, out);
|
||||
Coordination::write(snapshot.session_id, out);
|
||||
Coordination::write(snapshot.snapshot_container_size, out);
|
||||
writeBinary(snapshot.session_id, out);
|
||||
writeBinary(snapshot.snapshot_container_size, out);
|
||||
size_t counter = 0;
|
||||
for (auto it = snapshot.begin; counter < snapshot.snapshot_container_size; ++it, ++counter)
|
||||
{
|
||||
@ -104,40 +146,40 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot
|
||||
if (static_cast<size_t>(node.stat.mzxid) > snapshot.snapshot_meta->get_last_log_idx())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to serialize node with mzxid {}, but last snapshot index {}", node.stat.mzxid, snapshot.snapshot_meta->get_last_log_idx());
|
||||
|
||||
Coordination::write(path, out);
|
||||
writeBinary(path, out);
|
||||
writeNode(node, out);
|
||||
}
|
||||
|
||||
size_t size = snapshot.session_and_timeout.size();
|
||||
Coordination::write(size, out);
|
||||
writeBinary(size, out);
|
||||
for (const auto & [session_id, timeout] : snapshot.session_and_timeout)
|
||||
{
|
||||
Coordination::write(session_id, out);
|
||||
Coordination::write(timeout, out);
|
||||
writeBinary(session_id, out);
|
||||
writeBinary(timeout, out);
|
||||
}
|
||||
}
|
||||
|
||||
SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in)
|
||||
{
|
||||
uint8_t version;
|
||||
Coordination::read(version, in);
|
||||
readBinary(version, in);
|
||||
if (static_cast<SnapshotVersion>(version) > SnapshotVersion::V0)
|
||||
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
|
||||
|
||||
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);
|
||||
int64_t session_id;
|
||||
Coordination::read(session_id, in);
|
||||
readBinary(session_id, in);
|
||||
storage.zxid = result->get_last_log_idx();
|
||||
storage.session_id_counter = session_id;
|
||||
|
||||
size_t snapshot_container_size;
|
||||
Coordination::read(snapshot_container_size, in);
|
||||
readBinary(snapshot_container_size, in);
|
||||
|
||||
size_t current_size = 0;
|
||||
while (current_size < snapshot_container_size)
|
||||
{
|
||||
std::string path;
|
||||
Coordination::read(path, in);
|
||||
readBinary(path, in);
|
||||
NuKeeperStorage::Node node;
|
||||
readNode(node, in);
|
||||
storage.container.insertOrReplace(path, node);
|
||||
@ -157,14 +199,14 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora
|
||||
}
|
||||
|
||||
size_t active_sessions_size;
|
||||
Coordination::read(active_sessions_size, in);
|
||||
readBinary(active_sessions_size, in);
|
||||
|
||||
size_t current_session_size = 0;
|
||||
while (current_session_size < active_sessions_size)
|
||||
{
|
||||
int64_t active_session_id, timeout;
|
||||
Coordination::read(active_session_id, in);
|
||||
Coordination::read(timeout, in);
|
||||
readBinary(active_session_id, in);
|
||||
readBinary(timeout, in);
|
||||
storage.addSessionID(active_session_id, timeout);
|
||||
current_session_size++;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user