Polishing changes

This commit is contained in:
Antonio Andelic 2022-05-17 08:11:08 +00:00
parent b88f3fcfb1
commit fc5a79f186
9 changed files with 181 additions and 27 deletions

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <fmt/format.h>
#include <Common/logger_useful.h>
#include <array>
@ -27,6 +28,17 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
out.next();
}
std::string ZooKeeperRequest::toString() const
{
return fmt::format(
"XID = {}\n"
"OpNum = {}\n"
"Additional info:\n{}",
xid,
getOpNum(),
toStringImpl());
}
void ZooKeeperRequest::write(WriteBuffer & out) const
{
/// Excessive copy to calculate length.
@ -48,6 +60,11 @@ void ZooKeeperSyncRequest::readImpl(ReadBuffer & in)
Coordination::read(path, in);
}
std::string ZooKeeperSyncRequest::toStringImpl() const
{
return fmt::format("path = {}", path);
}
void ZooKeeperSyncResponse::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -93,6 +110,17 @@ void ZooKeeperAuthRequest::readImpl(ReadBuffer & in)
Coordination::read(data, in);
}
std::string ZooKeeperAuthRequest::toStringImpl() const
{
return fmt::format(
"type = {}\n"
"scheme = {}\n"
"data = {}",
type,
scheme,
data);
}
void ZooKeeperCreateRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
@ -124,6 +152,19 @@ void ZooKeeperCreateRequest::readImpl(ReadBuffer & in)
is_sequential = true;
}
std::string ZooKeeperCreateRequest::toStringImpl() const
{
return fmt::format(
"path = {}\n"
"data = {}\n"
"is_ephemeral = {}\n"
"is_sequential = {}",
path,
data,
is_ephemeral,
is_sequential);
}
void ZooKeeperCreateResponse::readImpl(ReadBuffer & in)
{
Coordination::read(path_created, in);
@ -140,6 +181,15 @@ void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
std::string ZooKeeperRemoveRequest::toStringImpl() const
{
return fmt::format(
"path = {}\n"
"version = {}",
path,
version);
}
void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
@ -158,6 +208,11 @@ void ZooKeeperExistsRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperExistsRequest::toStringImpl() const
{
return fmt::format("path = {}", path);
}
void ZooKeeperExistsResponse::readImpl(ReadBuffer & in)
{
Coordination::read(stat, in);
@ -180,6 +235,11 @@ void ZooKeeperGetRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperGetRequest::toStringImpl() const
{
return fmt::format("path = {}", path);
}
void ZooKeeperGetResponse::readImpl(ReadBuffer & in)
{
Coordination::read(data, in);
@ -206,6 +266,17 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperSetRequest::toStringImpl() const
{
return fmt::format(
"path = {}\n",
"data = {}\n"
"version = {}",
path,
data,
version);
}
void ZooKeeperSetResponse::readImpl(ReadBuffer & in)
{
Coordination::read(stat, in);
@ -228,6 +299,11 @@ void ZooKeeperListRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperListRequest::toStringImpl() const
{
return fmt::format("path = {}", path);
}
void ZooKeeperListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);
@ -255,6 +331,11 @@ void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperSetACLRequest::toStringImpl() const
{
return fmt::format("path = {}\n", "version = {}", path, version);
}
void ZooKeeperSetACLResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(stat, out);
@ -275,6 +356,11 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
std::string ZooKeeperGetACLRequest::toStringImpl() const
{
return fmt::format("path = {}", path);
}
void ZooKeeperGetACLResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(acl, out);
@ -299,6 +385,11 @@ void ZooKeeperCheckRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperCheckRequest::toStringImpl() const
{
return fmt::format("path = {}\n", "version = {}", path, version);
}
void ZooKeeperErrorResponse::readImpl(ReadBuffer & in)
{
Coordination::Error read_error;
@ -401,6 +492,17 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
}
}
std::string ZooKeeperMultiRequest::toStringImpl() const
{
auto out = fmt::memory_buffer();
for (const auto & request : requests)
{
const auto & zk_request = dynamic_cast<const ZooKeeperRequest &>(*request);
format_to(std::back_inserter(out), "SubRequest\n{}\n", zk_request.toString());
}
return {out.data(), out.size()};
}
bool ZooKeeperMultiRequest::isReadRequest() const
{
/// Possibly we can do better

View File

@ -68,10 +68,13 @@ struct ZooKeeperRequest : virtual Request
/// Writes length, xid, op_num, then the rest.
void write(WriteBuffer & out) const;
std::string toString() const;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void readImpl(ReadBuffer &) = 0;
virtual std::string toStringImpl() const { return ""; }
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
virtual ZooKeeperResponsePtr makeResponse() const = 0;
@ -100,6 +103,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Sync; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -150,6 +154,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Auth; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -202,6 +207,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Create; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -232,6 +238,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Remove; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -255,6 +262,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -278,6 +286,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -304,6 +313,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Set; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -328,6 +338,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -363,6 +374,7 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Check; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -397,6 +409,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::SetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -417,6 +430,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::GetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -441,6 +455,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override;

View File

@ -405,9 +405,10 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
request_for_session.request->write(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
DB::writeIntBinary(request_for_session.zxid, write_buf);
DB::writeIntBinary(request_for_session.digest.version, write_buf);
if (request_for_session.digest.version != KeeperStorage::DigestVersion::NO_DIGEST)
DB::writeIntBinary(request_for_session.digest.value, write_buf);
assert(request_for_session.digest);
DB::writeIntBinary(request_for_session.digest->version, write_buf);
if (request_for_session.digest->version != KeeperStorage::DigestVersion::NO_DIGEST)
DB::writeIntBinary(request_for_session.digest->value, write_buf);
return write_buf.getBuffer();
}

View File

@ -151,6 +151,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
if (snapshot.version >= SnapshotVersion::V5)
{
writeBinary(snapshot.zxid, out);
writeBinary(static_cast<uint8_t>(KeeperStorage::CURRENT_DIGEST_VERSION), out);
writeBinary(snapshot.nodes_digest, out);
}
@ -243,10 +244,22 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in);
KeeperStorage & storage = *deserialization_result.storage;
bool recalculate_digest = storage.digest_enabled;
if (version >= SnapshotVersion::V5)
{
readBinary(storage.zxid, in);
readBinary(storage.nodes_digest, in);
uint8_t digest_version;
readBinary(digest_version, in);
if (digest_version != KeeperStorage::DigestVersion::NO_DIGEST)
{
uint64_t nodes_digest;
readBinary(nodes_digest, in);
if (digest_version == KeeperStorage::CURRENT_DIGEST_VERSION)
{
storage.nodes_digest = nodes_digest;
recalculate_digest = false;
}
}
}
else
storage.zxid = deserialization_result.snapshot_meta->get_last_log_idx();
@ -299,6 +312,9 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
current_size++;
if (recalculate_digest)
storage.nodes_digest += node.getDigest(path);
}
for (const auto & itr : storage.container)

View File

@ -128,9 +128,10 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer
if (!buffer.eof())
{
readIntBinary(request_for_session.digest.version, buffer);
if (request_for_session.digest.version != KeeperStorage::DigestVersion::NO_DIGEST)
readIntBinary(request_for_session.digest.value, buffer);
request_for_session.digest.emplace();
readIntBinary(request_for_session.digest->version, buffer);
if (request_for_session.digest->version != KeeperStorage::DigestVersion::NO_DIGEST)
readIntBinary(request_for_session.digest->value, buffer);
}
return request_for_session;
@ -180,9 +181,19 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
}
if (digest_enabled && !KeeperStorage::checkDigest(request_for_session.digest, storage->getNodesDigest(true)))
assert(request_for_session.digest);
auto local_nodes_digest = storage->getNodesDigest(true);
if (digest_enabled && !KeeperStorage::checkDigest(*request_for_session.digest, local_nodes_digest))
{
LOG_ERROR(log, "Digest for nodes is not matching after applying request of type {}", request_for_session.request->getOpNum());
LOG_ERROR(
log,
"Digest for nodes is not matching after applying request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will "
"terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
request_for_session.request->getOpNum(),
request_for_session.digest->value,
local_nodes_digest.value,
request_for_session.digest->version,
request_for_session.request->toString());
std::terminate();
}

View File

@ -28,7 +28,8 @@ public:
/// Read state from the latest snapshot
void init();
KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data);
static KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data);
void preprocess(const KeeperStorage::RequestForSession & request_for_session);
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;

View File

@ -236,7 +236,7 @@ struct Overloaded : Ts...
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path, std::optional<int64_t> last_zxid) const
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path, std::optional<int64_t> current_zxid) const
{
std::shared_ptr<Node> node{nullptr};
@ -273,7 +273,7 @@ std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(St
},
[&](auto && /*delta*/) {},
},
last_zxid);
current_zxid);
return node;
}
@ -1657,7 +1657,7 @@ void KeeperStorage::preprocessRequest(
int64_t time,
int64_t new_last_zxid,
bool check_acl,
Digest expected_digest)
std::optional<Digest> digest)
{
int64_t last_zxid = getNextZXID() - 1;
@ -1669,7 +1669,9 @@ void KeeperStorage::preprocessRequest(
}
else
{
// if we are leader node, the request potentially already got processed
// if we are Leader node, the request potentially already got preprocessed
// Leader can preprocess requests in a batch so the ZXID we are searching isn't
// guaranteed to be last
auto txn_it = std::lower_bound(
uncommitted_transactions.begin(),
uncommitted_transactions.end(),
@ -1687,7 +1689,7 @@ void KeeperStorage::preprocessRequest(
}
else
{
if (txn_it->zxid == new_last_zxid && checkDigest(txn_it->nodes_digest, expected_digest))
if (txn_it->zxid == new_last_zxid)
// we found the preprocessed request with the same ZXID, we can skip it
return;
@ -1698,10 +1700,17 @@ void KeeperStorage::preprocessRequest(
TransactionInfo transaction{.zxid = new_last_zxid};
SCOPE_EXIT({
if (expected_digest.version == DigestVersion::NO_DIGEST && digest_enabled)
transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)};
if (digest_enabled)
{
// if the leader has the same digest calculation version we
// can skip recalculating and use that value
if (digest && digest->version == CURRENT_DIGEST_VERSION)
transaction.nodes_digest = *digest;
else
transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)};
}
else
transaction.nodes_digest = expected_digest;
transaction.nodes_digest = Digest{DigestVersion::NO_DIGEST};
uncommitted_transactions.emplace_back(transaction);
});
@ -1825,7 +1834,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
if (is_local)
{
assert(!zk_request->isReadRequest());
assert(zk_request->isReadRequest());
if (check_acl && !request_processor->checkAuth(*this, session_id, true))
{
response = zk_request->makeResponse();

View File

@ -101,7 +101,7 @@ public:
int64_t time;
Coordination::ZooKeeperRequestPtr request;
int64_t zxid{0};
Digest digest;
std::optional<Digest> digest;
};
struct AuthID
@ -211,11 +211,11 @@ public:
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
template <typename Visitor>
void applyDeltas(StringRef path, const Visitor & visitor, std::optional<int64_t> last_zxid = std::nullopt) const
void applyDeltas(StringRef path, const Visitor & visitor, std::optional<int64_t> current_zxid = std::nullopt) const
{
for (const auto & delta : deltas)
{
if (last_zxid && delta.zxid >= last_zxid)
if (current_zxid && delta.zxid >= current_zxid)
break;
if (path.empty() || delta.path == path)
@ -223,7 +223,7 @@ public:
}
}
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
{
for (const auto & session_auth : storage.session_and_auth[session_id])
{
@ -234,7 +234,6 @@ public:
if (is_local)
return false;
for (const auto & delta : deltas)
{
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
@ -245,7 +244,7 @@ public:
return false;
}
std::shared_ptr<Node> getNode(StringRef path, std::optional<int64_t> last_zxid = std::nullopt) const;
std::shared_ptr<Node> getNode(StringRef path, std::optional<int64_t> current_zxid = std::nullopt) const;
bool hasNode(StringRef path) const;
Coordination::ACLs getACLs(StringRef path) const;
@ -361,7 +360,7 @@ public:
int64_t time,
int64_t new_last_zxid,
bool check_acl = true,
Digest digest = {DigestVersion::NO_DIGEST, 0});
std::optional<Digest> digest = std::nullopt);
void rollbackRequest(int64_t rollback_zxid);
void finalize();

View File

@ -520,7 +520,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l
if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request))
return true;
storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false, {KeeperStorage::DigestVersion::NO_DIGEST});
storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false);
storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false);
}
}