From d296eeee2da3e53b1663daef0d1cf93fc29e8b58 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 5 Apr 2022 06:27:03 +0000 Subject: [PATCH] Small changes for Keeper --- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 2 +- src/Common/ZooKeeper/ZooKeeperImpl.h | 2 +- src/Coordination/Changelog.cpp | 14 +-- src/Coordination/KeeperDispatcher.cpp | 2 +- src/Coordination/KeeperSnapshotManager.cpp | 8 +- src/Coordination/KeeperStateMachine.cpp | 48 +++------- src/Coordination/KeeperStateManager.h | 2 +- src/Coordination/KeeperStorage.cpp | 94 +++++++++++-------- src/Coordination/KeeperStorage.h | 34 ++++--- src/Coordination/SnapshotableHashTable.h | 5 +- src/Coordination/ZooKeeperDataReader.cpp | 8 +- src/Coordination/tests/gtest_coordination.cpp | 4 + 12 files changed, 117 insertions(+), 106 deletions(-) diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index d3c993344b6..935df255843 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -846,7 +846,7 @@ void ZooKeeper::receiveEvent() void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason) { /// If some thread (send/receive) already finalizing session don't try to do it - bool already_started = finalization_started.exchange(true); + bool already_started = finalization_started.test_and_set(); LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_finished={}, reason={}", session_id, already_started, requests_queue.isFinished(), reason); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index b87469bd339..58c5947e8ea 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -209,7 +209,7 @@ private: std::atomic next_xid {1}; /// Mark session finalization start. Used to avoid simultaneous /// finalization from different threads. One-shot flag. - std::atomic finalization_started {false}; + std::atomic_flag finalization_started; using clock = std::chrono::steady_clock; diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index eb8a724ade9..92a83deae1e 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -222,8 +222,8 @@ public: } /// Check for duplicated changelog ids - if (logs.count(record.header.index) != 0) - std::erase_if(logs, [record] (const auto & item) { return item.first >= record.header.index; }); + if (logs.contains(record.header.index)) + std::erase_if(logs, [&record] (const auto & item) { return item.first >= record.header.index; }); result.total_entries_read_from_log += 1; @@ -659,6 +659,7 @@ LogEntryPtr Changelog::getLatestConfigChange() const nuraft::ptr Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count) { std::vector> returned_logs; + returned_logs.reserve(count); uint64_t size_total = 0; for (uint64_t i = index; i < index + count; ++i) @@ -669,7 +670,7 @@ nuraft::ptr Changelog::serializeEntriesToBuffer(uint64_t index, nuraft::ptr buf = entry->second->serialize(); size_total += buf->size(); - returned_logs.push_back(buf); + returned_logs.push_back(std::move(buf)); } nuraft::ptr buf_out = nuraft::buffer::alloc(sizeof(int32_t) + count * sizeof(int32_t) + size_total); @@ -678,9 +679,8 @@ nuraft::ptr Changelog::serializeEntriesToBuffer(uint64_t index, for (auto & entry : returned_logs) { - nuraft::ptr & bb = entry; - buf_out->put(static_cast(bb->size())); - buf_out->put(*bb); + buf_out->put(static_cast(entry->size())); + buf_out->put(*entry); } return buf_out; } @@ -699,7 +699,7 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer) buffer.get(buf_local); LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local); - if (i == 0 && logs.count(cur_index)) + if (i == 0 && logs.contains(cur_index)) writeAt(cur_index, log_entry); else appendEntry(cur_index, log_entry); diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index a4dcb0acc52..4d71c11221e 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -121,7 +121,7 @@ void KeeperDispatcher::requestThread() current_batch.clear(); } - prev_batch = current_batch; + prev_batch = std::move(current_batch); prev_result = result; } diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 429a76eec5e..43fc8b1ec0d 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -43,7 +43,7 @@ namespace void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out) { - writeBinary(node.data, out); + writeBinary(node.getData(), out); /// Serialize ACL writeBinary(node.acl_id, out); @@ -71,7 +71,9 @@ namespace void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) { - readBinary(node.data, in); + String new_data; + readBinary(new_data, in); + node.setData(std::move(new_data)); if (version >= SnapshotVersion::V1) { @@ -281,7 +283,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial if (itr.key != "/") { auto parent_path = parentPath(itr.key); - storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); }); + storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); }); } } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 35e56ba1e30..315882ee988 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -259,22 +259,9 @@ void KeeperStateMachine::save_logical_snp_obj( { LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); - nuraft::ptr cloned_buffer; - nuraft::ptr cloned_meta; - if (obj_id == 0) /// Fake snapshot required by NuRaft at startup - { - std::lock_guard lock(storage_and_responses_lock); - KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx(), getClusterConfig()); - cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); - } - else - { - /// copy snapshot into memory - } - /// copy snapshot meta into memory nuraft::ptr snp_buf = s.serialize(); - cloned_meta = nuraft::snapshot::deserialize(*snp_buf); + nuraft::ptr cloned_meta = nuraft::snapshot::deserialize(*snp_buf); try { @@ -332,31 +319,22 @@ int KeeperStateMachine::read_logical_snp_obj( { LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); - if (obj_id == 0) /// Fake snapshot required by NuRaft at startup + + std::lock_guard lock(snapshots_lock); + /// Our snapshot is not equal to required. Maybe we still creating it in the background. + /// Let's wait and NuRaft will retry this call. + if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx()) { - data_out = nuraft::buffer::alloc(sizeof(int32_t)); - nuraft::buffer_serializer bs(data_out); - bs.put_i32(0); - is_last_obj = false; + LOG_WARNING(log, "Required to apply snapshot with last log index {}, but our last log index is {}. Will ignore this one and retry", + s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); + return -1; } - else + if (bufferFromFile(log, latest_snapshot_path, data_out)) { - std::lock_guard lock(snapshots_lock); - /// Our snapshot is not equal to required. Maybe we still creating it in the background. - /// Let's wait and NuRaft will retry this call. - if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx()) - { - LOG_WARNING(log, "Required to apply snapshot with last log index {}, but our last log index is {}. Will ignore this one and retry", - s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); - return -1; - } - if (bufferFromFile(log, latest_snapshot_path, data_out)) - { - LOG_WARNING(log, "Error reading snapshot {} from {}", s.get_last_log_idx(), latest_snapshot_path); - return -1; - } - is_last_obj = true; + LOG_WARNING(log, "Error reading snapshot {} from {}", s.get_last_log_idx(), latest_snapshot_path); + return -1; } + is_last_obj = true; return 1; } diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index a56bafb6bae..66c6cc03b87 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -84,7 +84,7 @@ public: bool shouldStartAsFollower() const { std::lock_guard lock(configuration_wrapper_mutex); - return configuration_wrapper.servers_start_as_followers.count(my_server_id); + return configuration_wrapper.servers_start_as_followers.contains(my_server_id); } bool isSecure() const diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index e13b43d056a..a066f7257ae 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -24,7 +24,10 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -static String base64Encode(const String & decoded) +namespace +{ + +String base64Encode(const String & decoded) { std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM ostr.exceptions(std::ios::failbit); @@ -35,7 +38,7 @@ static String base64Encode(const String & decoded) return ostr.str(); } -static String getSHA1(const String & userdata) +String getSHA1(const String & userdata) { Poco::SHA1Engine engine; engine.update(userdata); @@ -43,14 +46,14 @@ static String getSHA1(const String & userdata) return String{digest_id.begin(), digest_id.end()}; } -static String generateDigest(const String & userdata) +String generateDigest(const String & userdata) { std::vector user_password; boost::split(user_password, userdata, [](char c) { return c == ':'; }); return user_password[0] + ":" + base64Encode(getSHA1(userdata)); } -static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector & session_auths) +bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector & session_auths) { if (node_acls.empty()) return true; @@ -77,7 +80,7 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c return false; } -static bool fixupACL( +bool fixupACL( const std::vector & request_acls, const std::vector & current_ids, std::vector & result_acls) @@ -119,7 +122,7 @@ static bool fixupACL( return valid_found; } -static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) +KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) { KeeperStorage::ResponsesForSessions result; auto it = watches.find(path); @@ -174,6 +177,25 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat } return result; } +} + +void KeeperStorage::Node::setData(String new_data) +{ + size_bytes = size_bytes - data.size() + new_data.size(); + data = std::move(new_data); +} + +void KeeperStorage::Node::addChild(StringRef child_path) +{ + size_bytes += sizeof child_path; + children.insert(child_path); +} + +void KeeperStorage::Node::removeChild(StringRef child_path) +{ + size_bytes -= sizeof child_path; + children.erase(child_path); +} KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_) : session_expiry_queue(tick_time_ms) @@ -314,8 +336,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr created_node.stat.numChildren = 0; created_node.stat.dataLength = request.data.length(); created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; - created_node.data = request.data; created_node.is_sequental = request.is_sequential; + created_node.setData(std::move(request.data)); auto [map_key, _] = container.insert(path_created, created_node); /// Take child path from key owned by map. @@ -327,8 +349,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid, parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent) { - parent.children.insert(child_path); - parent.size_bytes += child_path.size; + parent.addChild(child_path); prev_parent_cversion = parent.stat.cversion; prev_parent_zxid = parent.stat.pzxid; @@ -363,8 +384,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr --undo_parent.seq_num; undo_parent.stat.cversion = prev_parent_cversion; undo_parent.stat.pzxid = prev_parent_zxid; - undo_parent.children.erase(child_path); - undo_parent.size_bytes -= child_path.size; + undo_parent.removeChild(child_path); }); storage.container.erase(path_created); @@ -409,7 +429,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce else { response.stat = it->value.stat; - response.data = it->value.data; + response.data = it->value.getData(); response.error = Coordination::Error::ZOK; } @@ -498,8 +518,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { --parent.stat.numChildren; ++parent.stat.cversion; - parent.children.erase(child_basename); - parent.size_bytes -= child_basename.size; + parent.removeChild(child_basename); }); response.error = Coordination::Error::ZOK; @@ -520,8 +539,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { ++parent.stat.numChildren; --parent.stat.cversion; - parent.children.insert(child_name); - parent.size_bytes += child_name.size; + parent.addChild(child_name); }); }; } @@ -598,14 +616,13 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce auto prev_node = it->value; - auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) + auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) mutable { value.stat.version++; value.stat.mzxid = zxid; value.stat.mtime = time; value.stat.dataLength = request.data.length(); - value.size_bytes = value.size_bytes + request.data.size() - value.data.size(); - value.data = request.data; + value.setData(std::move(request.data)); }); container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent) @@ -675,9 +692,10 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc if (path_prefix.empty()) throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); - response.names.reserve(it->value.children.size()); + const auto & children = it->value.getChildren(); + response.names.reserve(children.size()); - for (const auto child : it->value.children) + for (const auto child : children) response.names.push_back(child.toString()); response.stat = it->value.stat; @@ -856,24 +874,23 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro for (const auto & sub_request : request.requests) { auto sub_zk_request = std::dynamic_pointer_cast(sub_request); - if (sub_zk_request->getOpNum() == Coordination::OpNum::Create) + switch (sub_zk_request->getOpNum()) { - concrete_requests.push_back(std::make_shared(sub_zk_request)); + case Coordination::OpNum::Create: + concrete_requests.push_back(std::make_shared(sub_zk_request)); + break; + case Coordination::OpNum::Remove: + concrete_requests.push_back(std::make_shared(sub_zk_request)); + break; + case Coordination::OpNum::Set: + concrete_requests.push_back(std::make_shared(sub_zk_request)); + break; + case Coordination::OpNum::Check: + concrete_requests.push_back(std::make_shared(sub_zk_request)); + break; + default: + throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); } - else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove) - { - concrete_requests.push_back(std::make_shared(sub_zk_request)); - } - else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set) - { - concrete_requests.push_back(std::make_shared(sub_zk_request)); - } - else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check) - { - concrete_requests.push_back(std::make_shared(sub_zk_request)); - } - else - throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); } } @@ -1092,8 +1109,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina --parent.stat.numChildren; ++parent.stat.cversion; auto base_name = getBaseName(ephemeral_path); - parent.children.erase(base_name); - parent.size_bytes -= base_name.size; + parent.removeChild(base_name); }); container.erase(ephemeral_path); diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 89a42078bc9..ccbddcf6e19 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -32,28 +32,38 @@ public: struct Node { - String data; uint64_t acl_id = 0; /// 0 -- no ACL by default bool is_sequental = false; Coordination::Stat stat{}; int32_t seq_num = 0; - ChildrenSet children{}; uint64_t size_bytes; // save size to avoid calculate every time - Node() - { - size_bytes = sizeof(size_bytes); - size_bytes += data.size(); - size_bytes += sizeof(acl_id); - size_bytes += sizeof(is_sequental); - size_bytes += sizeof(stat); - size_bytes += sizeof(seq_num); - } + Node() : size_bytes(sizeof(Node)) { } + /// Object memory size uint64_t sizeInBytes() const { return size_bytes; } + + void setData(String new_data); + + const auto & getData() const noexcept + { + return data; + } + + void addChild(StringRef child_path); + + void removeChild(StringRef child_path); + + const auto & getChildren() const noexcept + { + return children; + } + private: + String data; + ChildrenSet children{}; }; struct ResponseForSession @@ -104,7 +114,7 @@ public: /// Mapping session_id -> set of ephemeral nodes paths Ephemerals ephemerals; - /// Mapping sessuib_id -> set of watched nodes paths + /// Mapping session_id -> set of watched nodes paths SessionAndWatcher sessions_and_watchers; /// Expiration queue for session, allows to get dead sessions at some point of time SessionExpiryQueue session_expiry_queue; diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index a02e090cb60..27572ab86c7 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -80,7 +80,7 @@ private: approximate_data_size += value_size; if (!snapshot_mode) { - approximate_data_size += key_size; + approximate_data_size -= key_size; approximate_data_size -= old_value_size; } } @@ -132,7 +132,6 @@ public: if (!it) { - ListElem elem{copyStringInArena(arena, key), value, current_version}; auto itr = list.insert(list.end(), std::move(elem)); bool inserted; @@ -228,7 +227,7 @@ public: /// We in snapshot mode but updating some node which is already more /// fresh than snapshot distance. So it will not participate in /// snapshot and we don't need to copy it. - if (snapshot_mode && list_itr->version <= snapshot_up_to_version) + if (list_itr->version <= snapshot_up_to_version) { auto elem_copy = *(list_itr); list_itr->active_in_map = false; diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 8a3e177c507..e59c67329ff 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -98,7 +98,9 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L while (path != "/") { KeeperStorage::Node node{}; - Coordination::read(node.data, in); + String data; + Coordination::read(data, in); + node.setData(std::move(data)); Coordination::read(node.acl_id, in); /// Deserialize stat @@ -117,7 +119,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L Coordination::read(node.stat.pzxid, in); if (!path.empty()) { - node.stat.dataLength = node.data.length(); + node.stat.dataLength = node.getData().length(); node.seq_num = node.stat.cversion; storage.container.insertOrReplace(path, node); @@ -137,7 +139,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L if (itr.key != "/") { auto parent_path = parentPath(itr.key); - storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; }); + storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; }); } } diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 0fc00cbd75d..07544dfbb89 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -946,6 +946,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize) EXPECT_EQ(hello.getApproximateDataSize(), 9); hello.updateValue("hello", [](IntNode & value) { value = 2; }); EXPECT_EQ(hello.getApproximateDataSize(), 9); + hello.insertOrReplace("hello", 3); + EXPECT_EQ(hello.getApproximateDataSize(), 9); hello.erase("hello"); EXPECT_EQ(hello.getApproximateDataSize(), 0); @@ -958,6 +960,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize) EXPECT_EQ(hello.getApproximateDataSize(), 9); hello.updateValue("hello", [](IntNode & value) { value = 2; }); EXPECT_EQ(hello.getApproximateDataSize(), 18); + hello.insertOrReplace("hello", 1); + EXPECT_EQ(hello.getApproximateDataSize(), 27); hello.clearOutdatedNodes(); EXPECT_EQ(hello.getApproximateDataSize(), 9);