mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Merge pull request #34486 from bigo-sg/keeperVersion
Keeper: add version to SnapshotableHashTable
This commit is contained in:
commit
fd95d7a498
@ -337,8 +337,9 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t
|
||||
, session_id(storage->session_id_counter)
|
||||
, cluster_config(cluster_config_)
|
||||
{
|
||||
snapshot_container_size = storage->container.snapshotSize();
|
||||
storage->enableSnapshotMode(snapshot_container_size);
|
||||
auto [size, ver] = storage->container.snapshotSizeWithVersion();
|
||||
snapshot_container_size = size;
|
||||
storage->enableSnapshotMode(ver);
|
||||
begin = storage->getSnapshotIteratorBegin();
|
||||
session_and_timeout = storage->getActiveSessions();
|
||||
acl_map = storage->acl_map.getMapping();
|
||||
@ -351,8 +352,9 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const Sna
|
||||
, session_id(storage->session_id_counter)
|
||||
, cluster_config(cluster_config_)
|
||||
{
|
||||
snapshot_container_size = storage->container.snapshotSize();
|
||||
storage->enableSnapshotMode(snapshot_container_size);
|
||||
auto [size, ver] = storage->container.snapshotSizeWithVersion();
|
||||
snapshot_container_size = size;
|
||||
storage->enableSnapshotMode(ver);
|
||||
begin = storage->getSnapshotIteratorBegin();
|
||||
session_and_timeout = storage->getActiveSessions();
|
||||
acl_map = storage->acl_map.getMapping();
|
||||
|
@ -161,9 +161,10 @@ public:
|
||||
/// Set of methods for creating snapshots
|
||||
|
||||
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
|
||||
void enableSnapshotMode(size_t up_to_size)
|
||||
void enableSnapshotMode(size_t up_to_version)
|
||||
{
|
||||
container.enableSnapshotMode(up_to_size);
|
||||
container.enableSnapshotMode(up_to_version);
|
||||
|
||||
}
|
||||
|
||||
/// Turn off snapshot mode.
|
||||
|
@ -17,6 +17,8 @@ struct ListNode
|
||||
StringRef key;
|
||||
V value;
|
||||
|
||||
/// Monotonically increasing version info for snapshot
|
||||
size_t version{0};
|
||||
bool active_in_map{true};
|
||||
bool free_key{false};
|
||||
};
|
||||
@ -35,7 +37,8 @@ private:
|
||||
IndexMap map;
|
||||
bool snapshot_mode{false};
|
||||
/// Allows to avoid additional copies in updateValue function
|
||||
size_t snapshot_up_to_size = 0;
|
||||
size_t current_version{0};
|
||||
size_t snapshot_up_to_version{0};
|
||||
ArenaWithFreeLists arena;
|
||||
/// Collect invalid iterators to avoid traversing the whole list
|
||||
std::vector<Mapped> snapshot_invalid_iters;
|
||||
@ -129,8 +132,9 @@ public:
|
||||
|
||||
if (!it)
|
||||
{
|
||||
ListElem elem{copyStringInArena(arena, key), value, true};
|
||||
auto itr = list.insert(list.end(), elem);
|
||||
|
||||
ListElem elem{copyStringInArena(arena, key), value, current_version};
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
assert(inserted);
|
||||
@ -151,8 +155,8 @@ public:
|
||||
|
||||
if (it == map.end())
|
||||
{
|
||||
ListElem elem{copyStringInArena(arena, key), value, true};
|
||||
auto itr = list.insert(list.end(), elem);
|
||||
ListElem elem{copyStringInArena(arena, key), value, current_version};
|
||||
auto itr = list.insert(list.end(), std::move(elem));
|
||||
bool inserted;
|
||||
map.emplace(itr->key, it, inserted, hash_value);
|
||||
assert(inserted);
|
||||
@ -163,9 +167,9 @@ public:
|
||||
auto list_itr = it->getMapped();
|
||||
if (snapshot_mode)
|
||||
{
|
||||
ListElem elem{list_itr->key, value, true};
|
||||
ListElem elem{list_itr->key, value, current_version};
|
||||
list_itr->active_in_map = false;
|
||||
auto new_list_itr = list.insert(list.end(), elem);
|
||||
auto new_list_itr = list.insert(list.end(), std::move(elem));
|
||||
it->getMapped() = new_list_itr;
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
}
|
||||
@ -224,14 +228,14 @@ public:
|
||||
/// We in snapshot mode but updating some node which is already more
|
||||
/// fresh than snapshot distance. So it will not participate in
|
||||
/// snapshot and we don't need to copy it.
|
||||
size_t distance = std::distance(list.begin(), list_itr);
|
||||
if (distance < snapshot_up_to_size)
|
||||
if (snapshot_mode && list_itr->version <= snapshot_up_to_version)
|
||||
{
|
||||
auto elem_copy = *(list_itr);
|
||||
list_itr->active_in_map = false;
|
||||
snapshot_invalid_iters.push_back(list_itr);
|
||||
updater(elem_copy.value);
|
||||
auto itr = list.insert(list.end(), elem_copy);
|
||||
elem_copy.version = current_version;
|
||||
auto itr = list.insert(list.end(), std::move(elem_copy));
|
||||
it->getMapped() = itr;
|
||||
ret = itr;
|
||||
}
|
||||
@ -289,16 +293,16 @@ public:
|
||||
updateDataSize(CLEAR, 0, 0, 0);
|
||||
}
|
||||
|
||||
void enableSnapshotMode(size_t up_to_size)
|
||||
void enableSnapshotMode(size_t version)
|
||||
{
|
||||
snapshot_mode = true;
|
||||
snapshot_up_to_size = up_to_size;
|
||||
snapshot_up_to_version = version;
|
||||
++current_version;
|
||||
}
|
||||
|
||||
void disableSnapshotMode()
|
||||
{
|
||||
snapshot_mode = false;
|
||||
snapshot_up_to_size = 0;
|
||||
}
|
||||
|
||||
size_t size() const
|
||||
@ -306,9 +310,9 @@ public:
|
||||
return map.size();
|
||||
}
|
||||
|
||||
size_t snapshotSize() const
|
||||
std::pair<size_t, size_t> snapshotSizeWithVersion() const
|
||||
{
|
||||
return list.size();
|
||||
return std::make_pair(list.size(), current_version);
|
||||
}
|
||||
|
||||
uint64_t getApproximateDataSize() const
|
||||
|
@ -867,7 +867,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
EXPECT_FALSE(map_snp.insert("/hello", 145).second);
|
||||
map_snp.updateValue("/hello", [](IntNode & value) { value = 554; });
|
||||
EXPECT_EQ(map_snp.getValue("/hello"), 554);
|
||||
EXPECT_EQ(map_snp.snapshotSize(), 2);
|
||||
EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 2);
|
||||
EXPECT_EQ(map_snp.size(), 1);
|
||||
|
||||
auto itr = map_snp.begin();
|
||||
@ -886,7 +886,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
}
|
||||
EXPECT_EQ(map_snp.getValue("/hello3"), 3);
|
||||
|
||||
EXPECT_EQ(map_snp.snapshotSize(), 7);
|
||||
EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 7);
|
||||
EXPECT_EQ(map_snp.size(), 6);
|
||||
itr = std::next(map_snp.begin(), 2);
|
||||
for (size_t i = 0; i < 5; ++i)
|
||||
@ -900,7 +900,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
EXPECT_TRUE(map_snp.erase("/hello3"));
|
||||
EXPECT_TRUE(map_snp.erase("/hello2"));
|
||||
|
||||
EXPECT_EQ(map_snp.snapshotSize(), 7);
|
||||
EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 7);
|
||||
EXPECT_EQ(map_snp.size(), 4);
|
||||
itr = std::next(map_snp.begin(), 2);
|
||||
for (size_t i = 0; i < 5; ++i)
|
||||
@ -912,7 +912,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
|
||||
}
|
||||
map_snp.clearOutdatedNodes();
|
||||
|
||||
EXPECT_EQ(map_snp.snapshotSize(), 4);
|
||||
EXPECT_EQ(map_snp.snapshotSizeWithVersion().first, 4);
|
||||
EXPECT_EQ(map_snp.size(), 4);
|
||||
itr = map_snp.begin();
|
||||
EXPECT_EQ(itr->key, "/hello");
|
||||
@ -1166,14 +1166,15 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
|
||||
storage.container.erase("/hello_" + std::to_string(i));
|
||||
}
|
||||
EXPECT_EQ(storage.container.size(), 26);
|
||||
EXPECT_EQ(storage.container.snapshotSize(), 101);
|
||||
EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 101);
|
||||
EXPECT_EQ(storage.container.snapshotSizeWithVersion().second, 1);
|
||||
auto buf = manager.serializeSnapshotToBuffer(snapshot);
|
||||
manager.serializeSnapshotBufferToDisk(*buf, 50);
|
||||
}
|
||||
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin" + params.extension));
|
||||
EXPECT_EQ(storage.container.size(), 26);
|
||||
storage.clearGarbageAfterSnapshot();
|
||||
EXPECT_EQ(storage.container.snapshotSize(), 26);
|
||||
EXPECT_EQ(storage.container.snapshotSizeWithVersion().first, 26);
|
||||
for (size_t i = 0; i < 50; ++i)
|
||||
{
|
||||
if (i % 2 != 0)
|
||||
|
Loading…
Reference in New Issue
Block a user