Merge pull request #26127 from ClickHouse/apply_zookeeper_logs

Fix several bugs in ZooKeeper snapshots deserialization
This commit is contained in:
alesapin 2021-07-11 10:25:04 +03:00 committed by GitHub
commit 4964bc1230
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 35 deletions

View File

@ -45,6 +45,8 @@ struct ZooKeeperRequest : virtual Request
/// If the request was sent and we didn't get the response and the error happens, then we cannot be sure was it processed or not.
bool probably_sent = false;
bool restored_from_zookeeper_log = false;
ZooKeeperRequest() = default;
ZooKeeperRequest(const ZooKeeperRequest &) = default;
virtual ~ZooKeeperRequest() override = default;
@ -172,6 +174,9 @@ struct ZooKeeperCloseResponse final : ZooKeeperResponse
struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
{
/// used only during restore from zookeeper log
int32_t parent_cversion = -1;
ZooKeeperCreateRequest() = default;
explicit ZooKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
@ -183,9 +188,6 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
/// During recovery from log we don't rehash ACLs
bool need_to_hash_acls = true;
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
@ -362,8 +364,6 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetACLRequest::bytesSize() + sizeof(xid); }
bool need_to_hash_acls = true;
};
struct ZooKeeperSetACLResponse final : SetACLResponse, ZooKeeperResponse

View File

@ -267,13 +267,12 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
}
else
{
auto & session_auth_ids = storage.session_and_auth[session_id];
KeeperStorage::Node created_node;
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls))
if (!fixupACL(request.acls, session_auth_ids, node_acls, !request.restored_from_zookeeper_log))
{
response.error = Coordination::Error::ZINVALIDACL;
return {response_ptr, {}};
@ -307,16 +306,28 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
path_created += seq_num_str.str();
}
int32_t parent_cversion = request.parent_cversion;
auto child_path = getBaseName(path_created);
int64_t prev_parent_zxid;
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid] (KeeperStorage::Node & parent)
int32_t prev_parent_cversion;
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid,
parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent)
{
parent.children.insert(child_path);
prev_parent_cversion = parent.stat.cversion;
prev_parent_zxid = parent.stat.pzxid;
/// Increment sequential number even if node is not sequential
++parent.seq_num;
parent.children.insert(child_path);
++parent.stat.cversion;
prev_parent_zxid = parent.stat.pzxid;
parent.stat.pzxid = zxid;
if (parent_cversion == -1)
++parent.stat.cversion;
else if (parent_cversion > parent.stat.cversion)
parent.stat.cversion = parent_cversion;
if (zxid > parent.stat.pzxid)
parent.stat.pzxid = zxid;
++parent.stat.numChildren;
});
@ -326,7 +337,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (request.is_ephemeral)
ephemerals[session_id].emplace(path_created);
undo = [&storage, prev_parent_zxid, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
undo = [&storage, prev_parent_zxid, prev_parent_cversion, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
{
storage.container.erase(path_created);
storage.acl_map.removeUsage(acl_id);
@ -334,11 +345,11 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (is_ephemeral)
storage.ephemerals[session_id].erase(path_created);
storage.container.updateValue(parent_path, [child_path, prev_parent_zxid] (KeeperStorage::Node & undo_parent)
storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorage::Node & undo_parent)
{
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
--undo_parent.seq_num;
undo_parent.stat.cversion = prev_parent_cversion;
undo_parent.stat.pzxid = prev_parent_zxid;
undo_parent.children.erase(child_path);
});
@ -394,6 +405,24 @@ struct KeeperStorageGetRequest final : public KeeperStorageRequest
}
};
namespace
{
/// Garbage required to apply log to "fuzzy" zookeeper snapshot
void updateParentPzxid(const std::string & child_path, int64_t zxid, KeeperStorage::Container & container)
{
auto parent_path = parentPath(child_path);
auto parent_it = container.find(parent_path);
if (parent_it != container.end())
{
container.updateValue(parent_path, [zxid](KeeperStorage::Node & parent)
{
if (parent.stat.pzxid < zxid)
parent.stat.pzxid = zxid;
});
}
}
}
struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
@ -412,7 +441,7 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
}
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/) const override
{
auto & container = storage.container;
auto & ephemerals = storage.ephemerals;
@ -425,6 +454,8 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
auto it = container.find(request.path);
if (it == container.end())
{
if (request.restored_from_zookeeper_log)
updateParentPzxid(request.path, zxid, container);
response.error = Coordination::Error::ZNONODE;
}
else if (request.version != -1 && request.version != it->value.stat.version)
@ -437,6 +468,9 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
}
else
{
if (request.restored_from_zookeeper_log)
updateParentPzxid(request.path, zxid, container);
auto prev_node = it->value;
if (prev_node.stat.ephemeralOwner != 0)
{
@ -719,7 +753,7 @@ struct KeeperStorageSetACLRequest final : public KeeperStorageRequest
auto & session_auth_ids = storage.session_and_auth[session_id];
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls))
if (!fixupACL(request.acls, session_auth_ids, node_acls, !request.restored_from_zookeeper_log))
{
response.error = Coordination::Error::ZINVALIDACL;
return {response_ptr, {}};

View File

@ -174,7 +174,22 @@ void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::st
LOG_INFO(log, "Deserializing data from snapshot");
int64_t zxid_from_nodes = deserializeStorageData(storage, reader, log);
storage.zxid = std::max(zxid, zxid_from_nodes);
/// In ZooKeeper Snapshots can contain inconsistent state of storage. They call
/// this inconsistent state "fuzzy". So it's guaranteed that snapshot contain all
/// records up to zxid from snapshot name and also some records for future.
/// But it doesn't mean that we have just some state of storage from future (like zxid + 100 log records).
/// We have incorrect state of storage where some random log entries from future were applied....
///
/// In ZooKeeper they say that their transactions log is idempotent and can be applied to "fuzzy" state as is.
/// It's true but there is no any general invariant which produces this property. They just have ad-hoc "if's" which detects
/// "fuzzy" state inconsistencies and apply log records in special way. Several examples:
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L453-L463
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L476-L480
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L547-L549
if (zxid_from_nodes > zxid)
LOG_WARNING(log, "ZooKeeper snapshot was in inconsistent (fuzzy) state. Will try to apply log. ZooKeeper create non fuzzy snapshot with restart. You can just restart ZooKeeper server and get consistent version.");
storage.zxid = zxid;
LOG_INFO(log, "Finished, snapshot ZXID {}", storage.zxid);
}
@ -210,16 +225,18 @@ void deserializeLogMagic(ReadBuffer & in)
static constexpr int32_t LOG_HEADER = 1514884167; /// "ZKLG"
if (magic_header != LOG_HEADER)
throw Exception(ErrorCodes::CORRUPTED_DATA ,"Incorrect magic header in file, expected {}, got {}", LOG_HEADER, magic_header);
throw Exception(ErrorCodes::CORRUPTED_DATA, "Incorrect magic header in file, expected {}, got {}", LOG_HEADER, magic_header);
if (version != 2)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,"Cannot deserialize ZooKeeper data other than version 2, got version {}", version);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot deserialize ZooKeeper data other than version 2, got version {}", version);
}
/// For some reason zookeeper stores slightly different records in log then
/// requests. For example:
/// class CreateTxn {
/// ZooKeeper transactions log differs from requests. The main reason: to store records in log
/// in some "finalized" state (for example with concrete versions).
///
/// Example:
/// class CreateTxn {
/// ustring path;
/// buffer data;
/// vector<org.apache.zookeeper.data.ACL> acl;
@ -289,10 +306,9 @@ Coordination::ZooKeeperRequestPtr deserializeCreateTxn(ReadBuffer & in)
Coordination::read(result->data, in);
Coordination::read(result->acls, in);
Coordination::read(result->is_ephemeral, in);
result->need_to_hash_acls = false;
/// How we should use it? It should just increment on request execution
int32_t parent_c_version;
Coordination::read(parent_c_version, in);
Coordination::read(result->parent_cversion, in);
result->restored_from_zookeeper_log = true;
return result;
}
@ -300,6 +316,7 @@ Coordination::ZooKeeperRequestPtr deserializeDeleteTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperRemoveRequest> result = std::make_shared<Coordination::ZooKeeperRemoveRequest>();
Coordination::read(result->path, in);
result->restored_from_zookeeper_log = true;
return result;
}
@ -309,6 +326,7 @@ Coordination::ZooKeeperRequestPtr deserializeSetTxn(ReadBuffer & in)
Coordination::read(result->path, in);
Coordination::read(result->data, in);
Coordination::read(result->version, in);
result->restored_from_zookeeper_log = true;
/// It stores version + 1 (which should be, not for request)
result->version -= 1;
@ -320,6 +338,7 @@ Coordination::ZooKeeperRequestPtr deserializeCheckVersionTxn(ReadBuffer & in)
std::shared_ptr<Coordination::ZooKeeperCheckRequest> result = std::make_shared<Coordination::ZooKeeperCheckRequest>();
Coordination::read(result->path, in);
Coordination::read(result->version, in);
result->restored_from_zookeeper_log = true;
return result;
}
@ -329,14 +348,19 @@ Coordination::ZooKeeperRequestPtr deserializeCreateSession(ReadBuffer & in)
int32_t timeout;
Coordination::read(timeout, in);
result->session_timeout_ms = timeout;
result->restored_from_zookeeper_log = true;
return result;
}
Coordination::ZooKeeperRequestPtr deserializeCloseSession(ReadBuffer & in)
Coordination::ZooKeeperRequestPtr deserializeCloseSession(ReadBuffer & in, bool empty)
{
std::shared_ptr<Coordination::ZooKeeperCloseRequest> result = std::make_shared<Coordination::ZooKeeperCloseRequest>();
std::vector<std::string> data;
Coordination::read(data, in);
if (!empty)
{
std::vector<std::string> data;
Coordination::read(data, in);
}
result->restored_from_zookeeper_log = true;
return result;
}
@ -356,14 +380,14 @@ Coordination::ZooKeeperRequestPtr deserializeSetACLTxn(ReadBuffer & in)
Coordination::read(result->version, in);
/// It stores version + 1 (which should be, not for request)
result->version -= 1;
result->need_to_hash_acls = false;
result->restored_from_zookeeper_log = true;
return result;
}
Coordination::ZooKeeperRequestPtr deserializeMultiTxn(ReadBuffer & in);
Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtxn)
Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtxn, int64_t txn_length = 0)
{
int32_t type;
Coordination::read(type, in);
@ -372,6 +396,11 @@ Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtx
if (subtxn)
Coordination::read(sub_txn_length, in);
bool empty_txn = !subtxn && txn_length == 32; /// Possible for old-style CloseTxn's
if (empty_txn && type != -11)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Empty non close-session transaction found");
int64_t in_count_before = in.count();
switch (type)
@ -398,7 +427,7 @@ Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtx
result = deserializeCreateSession(in);
break;
case -11:
result = deserializeCloseSession(in);
result = deserializeCloseSession(in, empty_txn);
break;
case -1:
result = deserializeErrorTxn(in);
@ -442,7 +471,7 @@ bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
if (request == nullptr)
return true;
for (const auto & subrequest : dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get())->requests) //-V522
for (const auto & subrequest : dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get())->requests) // -V522
if (subrequest == nullptr)
return true;
return false;
@ -470,7 +499,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l
int64_t time;
Coordination::read(time, in);
Coordination::ZooKeeperRequestPtr request = deserializeTxnImpl(in, false);
Coordination::ZooKeeperRequestPtr request = deserializeTxnImpl(in, false, txn_len);
/// Skip all other bytes
int64_t bytes_read = in.count() - count_before;