From 79fc8d67ad9665d011cee06855d9bd4ce1065597 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 2 Sep 2024 11:44:21 +0200 Subject: [PATCH] More fixes --- src/Coordination/KeeperSnapshotManager.cpp | 19 +++++++--- src/Coordination/KeeperStorage.cpp | 42 +++++++++++----------- src/Coordination/KeeperStorage.h | 2 -- utils/keeper-bench/Runner.cpp | 1 + 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 81c72c1beb0..a5cb8218f1f 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -471,12 +471,21 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult::UncommittedState::addDeltas(std::list new_ template void KeeperStorage::UncommittedState::cleanup(int64_t commit_zxid) { - for (const auto & [transaction_zxid, transaction_nodes] : zxid_to_nodes) + for (auto it = zxid_to_nodes.begin(); it != zxid_to_nodes.end(); it = zxid_to_nodes.erase(it)) { + const auto & [transaction_zxid, transaction_nodes] = *it; + if (transaction_zxid > commit_zxid) break; - std::cout << transaction_zxid << std::endl; for (const auto node : transaction_nodes) { - std::cout << node << std::endl; auto node_it = nodes.find(node); - chassert(node_it != nodes.end()); + if (node_it == nodes.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing expected uncommitted node '{}'", node); + node_it->second.applied_zxids.erase(transaction_zxid); if (node_it->second.applied_zxids.empty()) nodes.erase(node_it); @@ -1295,7 +1297,6 @@ bool checkAuth(const T & /*zk_request*/, Storage & /*storage*/, int64_t /*sessio /// Default implementations /// /// HEARTBEAT Request /// - template Coordination::ZooKeeperResponsePtr process( const Coordination::ZooKeeperHeartbeatRequest & zk_request, @@ -1306,11 +1307,9 @@ Coordination::ZooKeeperResponsePtr process( response_ptr->error = storage.commit(std::move(deltas)); return response_ptr; } - /// HEARTBEAT Request /// /// SYNC Request /// - template Coordination::ZooKeeperResponsePtr process(const Coordination::ZooKeeperSyncRequest & zk_request, Storage & /* storage */, std::list /* deltas */) @@ -1319,7 +1318,6 @@ process(const Coordination::ZooKeeperSyncRequest & zk_request, Storage & /* stor response->path = zk_request.path; return response; } - /// SYNC Request /// /// CREATE Request /// @@ -1793,18 +1791,19 @@ std::list preprocess( if (zk_request.version != -1 && zk_request.version != node->stats.version) return {KeeperStorageBase::Delta{zxid, Coordination::Error::ZBADVERSION}}; + new_deltas.emplace_back( + zk_request.path, + zxid, + KeeperStorageBase::UpdateNodeDataDelta{.old_data = std::string{node->getData()}, .new_data = zk_request.data, .version = zk_request.version}); + KeeperStorageBase::UpdateNodeStatDelta node_delta(*node); node_delta.version = zk_request.version; auto & new_stats = node_delta.new_stats; new_stats.version++; new_stats.mzxid = zxid; new_stats.mtime = time; - + new_stats.data_size = static_cast(zk_request.data.size()); new_deltas.emplace_back(zk_request.path, zxid, std::move(node_delta)); - new_deltas.emplace_back( - zk_request.path, - zxid, - KeeperStorageBase::UpdateNodeDataDelta{.old_data = std::string{node->getData()}, .new_data = zk_request.data, .version = zk_request.version}); auto parent_path = parentNodePath(zk_request.path); auto parent_node = storage.uncommitted_state.getNode(parent_path); @@ -1868,7 +1867,9 @@ std::list preprocess( template Coordination::ZooKeeperResponsePtr processImpl(const Coordination::ZooKeeperListRequest & zk_request, Storage & storage, std::list deltas) { - auto response = std::make_shared(); + std::shared_ptr response = zk_request.getOpNum() == Coordination::OpNum::SimpleList + ? std::make_shared() + : std::make_shared(); if constexpr (!local) { @@ -2215,17 +2216,16 @@ Coordination::ZooKeeperResponsePtr processLocal(const Coordination::ZooKeeperMul return response; } -template KeeperStorageBase::ResponsesForSessions processWatches( - const Coordination::ZooKeeperMultiRequest & zk_request, typename Storage::Watches & watches, typename Storage::Watches & list_watches) + const Coordination::ZooKeeperMultiRequest & zk_request, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) { - typename Storage::ResponsesForSessions result; + KeeperStorageBase::ResponsesForSessions result; const auto & subrequests = zk_request.requests; for (const auto & generic_request : subrequests) { auto responses = callOnConcreteRequestType( - generic_request, [&](const auto & subrequest) { return processWatches(subrequest, watches, list_watches); }); + *generic_request, [&](const auto & subrequest) { return processWatches(subrequest, watches, list_watches); }); result.insert(result.end(), responses.begin(), responses.end()); } return result; @@ -2506,7 +2506,7 @@ UInt64 KeeperStorage::calculateNodesDigest(UInt64 current_digest, con auto updated_node_it = updated_nodes.find(delta.path); if (updated_node_it == updated_nodes.end()) { - node = std::make_shared(); + node = std::make_shared(); node->shallowCopy(*uncommitted_state.getNode(delta.path)); current_digest -= node->getDigest(delta.path); updated_nodes.emplace(delta.path, node); @@ -2772,7 +2772,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process { Coordination::ZooKeeperResponsePtr response; - uncommitted_state.tryGetNodeFromStorage("/node_with_acl"); if (is_local) { chassert(zk_request->isReadRequest()); @@ -2793,7 +2792,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process std::lock_guard lock(storage_mutex); response = process(concrete_zk_request, *this, std::move(deltas)); } - uncommitted_state.tryGetNodeFromStorage("/node_with_acl"); /// Watches for this requests are added to the watches lists if (zk_request->has_watch) diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 1c3c77e140b..684f75c4bdf 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -357,8 +357,6 @@ public: NodeStats old_stats; NodeStats new_stats; - int32_t old_seq_num; - int32_t new_seq_num; int32_t version{-1}; }; diff --git a/utils/keeper-bench/Runner.cpp b/utils/keeper-bench/Runner.cpp index 59761d827e1..0584e9e34c8 100644 --- a/utils/keeper-bench/Runner.cpp +++ b/utils/keeper-bench/Runner.cpp @@ -1114,6 +1114,7 @@ void Runner::runBenchmarkFromLog() else { request_from_log->connection = get_zookeeper_connection(request_from_log->session_id); + request_from_log->executor_id %= concurrency; push_request(std::move(*request_from_log)); }