diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 5c009116010..429a76eec5e 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -337,8 +337,9 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t , session_id(storage->session_id_counter) , cluster_config(cluster_config_) { - snapshot_container_size = storage->container.snapshotSize(); - storage->enableSnapshotMode(snapshot_container_size); + auto [size, ver] = storage->container.snapshotSizeWithVersion(); + snapshot_container_size = size; + storage->enableSnapshotMode(ver); begin = storage->getSnapshotIteratorBegin(); session_and_timeout = storage->getActiveSessions(); acl_map = storage->acl_map.getMapping(); @@ -351,8 +352,9 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const Sna , session_id(storage->session_id_counter) , cluster_config(cluster_config_) { - snapshot_container_size = storage->container.snapshotSize(); - storage->enableSnapshotMode(snapshot_container_size); + auto [size, ver] = storage->container.snapshotSizeWithVersion(); + snapshot_container_size = size; + storage->enableSnapshotMode(ver); begin = storage->getSnapshotIteratorBegin(); session_and_timeout = storage->getActiveSessions(); acl_map = storage->acl_map.getMapping(); diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index eef53e4f6ca..cb218a0db15 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -161,9 +161,10 @@ public: /// Set of methods for creating snapshots /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version. - void enableSnapshotMode(size_t up_to_size) + void enableSnapshotMode(size_t up_to_version) { - container.enableSnapshotMode(up_to_size); + container.enableSnapshotMode(up_to_version); + } /// Turn off snapshot mode. diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index c3a6d7a8eff..a02e090cb60 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -17,6 +17,8 @@ struct ListNode StringRef key; V value; + /// Monotonically increasing version info for snapshot + size_t version{0}; bool active_in_map{true}; bool free_key{false}; }; @@ -35,7 +37,8 @@ private: IndexMap map; bool snapshot_mode{false}; /// Allows to avoid additional copies in updateValue function - size_t snapshot_up_to_size = 0; + size_t current_version{0}; + size_t snapshot_up_to_version{0}; ArenaWithFreeLists arena; /// Collect invalid iterators to avoid traversing the whole list std::vector snapshot_invalid_iters; @@ -129,8 +132,9 @@ public: if (!it) { - ListElem elem{copyStringInArena(arena, key), value, true}; - auto itr = list.insert(list.end(), elem); + + ListElem elem{copyStringInArena(arena, key), value, current_version}; + auto itr = list.insert(list.end(), std::move(elem)); bool inserted; map.emplace(itr->key, it, inserted, hash_value); assert(inserted); @@ -151,8 +155,8 @@ public: if (it == map.end()) { - ListElem elem{copyStringInArena(arena, key), value, true}; - auto itr = list.insert(list.end(), elem); + ListElem elem{copyStringInArena(arena, key), value, current_version}; + auto itr = list.insert(list.end(), std::move(elem)); bool inserted; map.emplace(itr->key, it, inserted, hash_value); assert(inserted); @@ -163,9 +167,9 @@ public: auto list_itr = it->getMapped(); if (snapshot_mode) { - ListElem elem{list_itr->key, value, true}; + ListElem elem{list_itr->key, value, current_version}; list_itr->active_in_map = false; - auto new_list_itr = list.insert(list.end(), elem); + auto new_list_itr = list.insert(list.end(), std::move(elem)); it->getMapped() = new_list_itr; snapshot_invalid_iters.push_back(list_itr); } @@ -224,14 +228,14 @@ public: /// We in snapshot mode but updating some node which is already more /// fresh than snapshot distance. So it will not participate in /// snapshot and we don't need to copy it. - size_t distance = std::distance(list.begin(), list_itr); - if (distance < snapshot_up_to_size) + if (snapshot_mode && list_itr->version <= snapshot_up_to_version) { auto elem_copy = *(list_itr); list_itr->active_in_map = false; snapshot_invalid_iters.push_back(list_itr); updater(elem_copy.value); - auto itr = list.insert(list.end(), elem_copy); + elem_copy.version = current_version; + auto itr = list.insert(list.end(), std::move(elem_copy)); it->getMapped() = itr; ret = itr; } @@ -289,16 +293,16 @@ public: updateDataSize(CLEAR, 0, 0, 0); } - void enableSnapshotMode(size_t up_to_size) + void enableSnapshotMode(size_t version) { snapshot_mode = true; - snapshot_up_to_size = up_to_size; + snapshot_up_to_version = version; + ++current_version; } void disableSnapshotMode() { snapshot_mode = false; - snapshot_up_to_size = 0; } size_t size() const @@ -306,9 +310,9 @@ public: return map.size(); } - size_t snapshotSize() const + std::pair snapshotSizeWithVersion() const { - return list.size(); + return std::make_pair(list.size(), current_version); } uint64_t getApproximateDataSize() const diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 0c527b60964..0fc00cbd75d 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -867,7 +867,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) EXPECT_FALSE(map_snp.insert("/hello", 145).second); map_snp.updateValue("/hello", [](IntNode & value) { value = 554; }); EXPECT_EQ(map_snp.getValue("/hello"), 554); - EXPECT_EQ(map_snp.snapshotSize(), 2); + EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 2); EXPECT_EQ(map_snp.size(), 1); auto itr = map_snp.begin(); @@ -886,7 +886,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) } EXPECT_EQ(map_snp.getValue("/hello3"), 3); - EXPECT_EQ(map_snp.snapshotSize(), 7); + EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 7); EXPECT_EQ(map_snp.size(), 6); itr = std::next(map_snp.begin(), 2); for (size_t i = 0; i < 5; ++i) @@ -900,7 +900,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) EXPECT_TRUE(map_snp.erase("/hello3")); EXPECT_TRUE(map_snp.erase("/hello2")); - EXPECT_EQ(map_snp.snapshotSize(), 7); + EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 7); EXPECT_EQ(map_snp.size(), 4); itr = std::next(map_snp.begin(), 2); for (size_t i = 0; i < 5; ++i) @@ -912,7 +912,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) } map_snp.clearOutdatedNodes(); - EXPECT_EQ(map_snp.snapshotSize(), 4); + EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 4); EXPECT_EQ(map_snp.size(), 4); itr = map_snp.begin(); EXPECT_EQ(itr->key, "/hello"); @@ -1166,14 +1166,15 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode) storage.container.erase("/hello_" + std::to_string(i)); } EXPECT_EQ(storage.container.size(), 26); - EXPECT_EQ(storage.container.snapshotSize(), 101); + EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 101); + EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 50); } EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension)); EXPECT_EQ(storage.container.size(), 26); storage.clearGarbageAfterSnapshot(); - EXPECT_EQ(storage.container.snapshotSize(), 26); + EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 26); for (size_t i = 0; i < 50; ++i) { if (i % 2 != 0)