diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index e4d8b13ec37..4f095974836 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -212,6 +212,8 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval void Changelog::readChangelogAndInitWriter(size_t from_log_idx) { size_t read_from_last = 0; + start_index = from_log_idx == 0 ? 1 : from_log_idx; + size_t total_read = 0; for (const auto & [start_id, changelog_file] : existing_changelogs) { ChangelogName parsed_name = getChangelogName(changelog_file); @@ -219,11 +221,10 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx) { ChangelogReader reader(changelog_file); read_from_last = reader.readChangelog(logs, from_log_idx, index_to_start_pos); + total_read += read_from_last; } } - start_index = from_log_idx == 0 ? 1 : from_log_idx; - if (existing_changelogs.size() > 0 && read_from_last < rotate_interval) { auto str_name = existing_changelogs.rbegin()->second; @@ -233,7 +234,7 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx) } else { - rotate(start_index); + rotate(start_index + total_read); } } diff --git a/src/Coordination/NuKeeperLogStore.cpp b/src/Coordination/NuKeeperLogStore.cpp index fa0631e14ad..fa8d6d6c299 100644 --- a/src/Coordination/NuKeeperLogStore.cpp +++ b/src/Coordination/NuKeeperLogStore.cpp @@ -94,4 +94,10 @@ void NuKeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack) changelog.applyEntriesFromBuffer(index, pack); } +size_t NuKeeperLogStore::size() const +{ + std::lock_guard lock(changelog_lock); + return changelog.size(); +} + } diff --git a/src/Coordination/NuKeeperLogStore.h b/src/Coordination/NuKeeperLogStore.h index 981dc3f24e7..49d5dbfdf7c 100644 --- a/src/Coordination/NuKeeperLogStore.h +++ b/src/Coordination/NuKeeperLogStore.h @@ -39,6 +39,8 @@ public: bool flush() override; + size_t size() const; + private: mutable std::mutex changelog_lock; Changelog changelog; diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 6d91ba95111..8328d93d9cf 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -114,10 +114,10 @@ struct SimpliestRaftServer if (!raft_instance) { - std::cerr << "Failed to initialize launcher (see the message " - "in the log file)." << std::endl; + std::cerr << "Failed to initialize launcher" << std::endl; exit(-1); } + std::cout << "init Raft instance " << server_id; for (size_t ii = 0; ii < 20; ++ii) { @@ -370,33 +370,33 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term) TEST(CoordinationTest, ChangelogTestSimple) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); auto entry = getLogEntry("hello world", 77); - changelog.appendEntry(1, entry); - EXPECT_EQ(changelog.getNextEntryIndex(), 2); - EXPECT_EQ(changelog.getStartIndex(), 1); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 77); - EXPECT_EQ(changelog.entryAt(1)->get_term(), 77); - EXPECT_EQ(changelog.getLogEntriesBetween(1, 2)->size(), 1); + changelog.append(entry); + EXPECT_EQ(changelog.next_slot(), 2); + EXPECT_EQ(changelog.start_index(), 1); + EXPECT_EQ(changelog.last_entry()->get_term(), 77); + EXPECT_EQ(changelog.entry_at(1)->get_term(), 77); + EXPECT_EQ(changelog.log_entries(1, 2)->size(), 1); } TEST(CoordinationTest, ChangelogTestFile) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); auto entry = getLogEntry("hello world", 77); - changelog.appendEntry(1, entry); + changelog.append(entry); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); for (const auto & p : fs::directory_iterator("./logs")) EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin"); - changelog.appendEntry(2, entry); - changelog.appendEntry(3, entry); - changelog.appendEntry(4, entry); - changelog.appendEntry(5, entry); - changelog.appendEntry(6, entry); + changelog.append(entry); + changelog.append(entry); + changelog.append(entry); + changelog.append(entry); + changelog.append(entry); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); @@ -405,26 +405,26 @@ TEST(CoordinationTest, ChangelogTestFile) TEST(CoordinationTest, ChangelogReadWrite) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 1000); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 1000); + changelog.init(1); for (size_t i = 0; i < 10; ++i) { auto entry = getLogEntry("hello world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 10); - DB::Changelog changelog_reader("./logs", 1000); - changelog_reader.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_reader("./logs", 1000); + changelog_reader.init(1); EXPECT_EQ(changelog_reader.size(), 10); - EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), changelog.getLastEntry()->get_term()); - EXPECT_EQ(changelog_reader.getStartIndex(), changelog.getStartIndex()); - EXPECT_EQ(changelog_reader.getNextEntryIndex(), changelog.getNextEntryIndex()); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term()); + EXPECT_EQ(changelog_reader.start_index(), changelog.start_index()); + EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot()); for (size_t i = 0; i < 10; ++i) - EXPECT_EQ(changelog_reader.entryAt(i + 1)->get_term(), changelog.entryAt(i + 1)->get_term()); + EXPECT_EQ(changelog_reader.entry_at(i + 1)->get_term(), changelog.entry_at(i + 1)->get_term()); - auto entries_from_range_read = changelog_reader.getLogEntriesBetween(1, 11); - auto entries_from_range = changelog.getLogEntriesBetween(1, 11); + auto entries_from_range_read = changelog_reader.log_entries(1, 11); + auto entries_from_range = changelog.log_entries(1, 11); EXPECT_EQ(entries_from_range_read->size(), entries_from_range->size()); EXPECT_EQ(10, entries_from_range->size()); } @@ -432,55 +432,55 @@ TEST(CoordinationTest, ChangelogReadWrite) TEST(CoordinationTest, ChangelogWriteAt) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 1000); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 1000); + changelog.init(1); for (size_t i = 0; i < 10; ++i) { auto entry = getLogEntry("hello world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 10); auto entry = getLogEntry("writer", 77); - changelog.writeAt(7, entry); + changelog.write_at(7, entry); EXPECT_EQ(changelog.size(), 7); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 77); - EXPECT_EQ(changelog.entryAt(7)->get_term(), 77); - EXPECT_EQ(changelog.getNextEntryIndex(), 8); + EXPECT_EQ(changelog.last_entry()->get_term(), 77); + EXPECT_EQ(changelog.entry_at(7)->get_term(), 77); + EXPECT_EQ(changelog.next_slot(), 8); - DB::Changelog changelog_reader("./logs", 1000); - changelog_reader.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_reader("./logs", 1000); + changelog_reader.init(1); EXPECT_EQ(changelog_reader.size(), changelog.size()); - EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), changelog.getLastEntry()->get_term()); - EXPECT_EQ(changelog_reader.getStartIndex(), changelog.getStartIndex()); - EXPECT_EQ(changelog_reader.getNextEntryIndex(), changelog.getNextEntryIndex()); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term()); + EXPECT_EQ(changelog_reader.start_index(), changelog.start_index()); + EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot()); } TEST(CoordinationTest, ChangelogTestAppendAfterRead) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); for (size_t i = 0; i < 7; ++i) { auto entry = getLogEntry("hello world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 7); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); - DB::Changelog changelog_reader("./logs", 5); - changelog_reader.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_reader("./logs", 5); + changelog_reader.init(1); EXPECT_EQ(changelog_reader.size(), 7); for (size_t i = 7; i < 10; ++i) { auto entry = getLogEntry("hello world", i * 10); - changelog_reader.appendEntry(changelog_reader.getNextEntryIndex(), entry); + changelog_reader.append(entry); } EXPECT_EQ(changelog_reader.size(), 10); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); @@ -493,7 +493,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead) EXPECT_EQ(logs_count, 2); auto entry = getLogEntry("someentry", 77); - changelog_reader.appendEntry(changelog_reader.getNextEntryIndex(), entry); + changelog_reader.append(entry); EXPECT_EQ(changelog_reader.size(), 11); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); @@ -509,13 +509,13 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead) TEST(CoordinationTest, ChangelogTestCompaction) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); for (size_t i = 0; i < 3; ++i) { auto entry = getLogEntry("hello world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 3); @@ -523,15 +523,19 @@ TEST(CoordinationTest, ChangelogTestCompaction) changelog.compact(2); EXPECT_EQ(changelog.size(), 1); - EXPECT_EQ(changelog.getStartIndex(), 3); - EXPECT_EQ(changelog.getNextEntryIndex(), 4); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 20); + EXPECT_EQ(changelog.start_index(), 3); + EXPECT_EQ(changelog.next_slot(), 4); + EXPECT_EQ(changelog.last_entry()->get_term(), 20); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); - changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 30)); - changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 40)); - changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 50)); - changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 60)); + auto e1 = getLogEntry("hello world", 30); + changelog.append(e1); + auto e2 = getLogEntry("hello world", 40); + changelog.append(e2); + auto e3 = getLogEntry("hello world", 50); + changelog.append(e3); + auto e4 = getLogEntry("hello world", 60); + changelog.append(e4); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); @@ -542,109 +546,110 @@ TEST(CoordinationTest, ChangelogTestCompaction) EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); EXPECT_EQ(changelog.size(), 1); - EXPECT_EQ(changelog.getStartIndex(), 7); - EXPECT_EQ(changelog.getNextEntryIndex(), 8); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 60); + EXPECT_EQ(changelog.start_index(), 7); + EXPECT_EQ(changelog.next_slot(), 8); + EXPECT_EQ(changelog.last_entry()->get_term(), 60); /// And we able to read it - DB::Changelog changelog_reader("./logs", 5); - changelog_reader.readChangelogAndInitWriter(7); + DB::NuKeeperLogStore changelog_reader("./logs", 5); + changelog_reader.init(7); EXPECT_EQ(changelog_reader.size(), 1); - EXPECT_EQ(changelog_reader.getStartIndex(), 7); - EXPECT_EQ(changelog_reader.getNextEntryIndex(), 8); - EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), 60); + EXPECT_EQ(changelog_reader.start_index(), 7); + EXPECT_EQ(changelog_reader.next_slot(), 8); + EXPECT_EQ(changelog_reader.last_entry()->get_term(), 60); } TEST(CoordinationTest, ChangelogTestBatchOperations) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 100); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 100); + changelog.init(1); for (size_t i = 0; i < 10; ++i) { auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 10); - auto entries = changelog.serializeEntriesToBuffer(1, 5); + auto entries = changelog.pack(1, 5); - DB::Changelog apply_changelog("./logs", 100); - apply_changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore apply_changelog("./logs", 100); + apply_changelog.init(1); for (size_t i = 0; i < 10; ++i) { - EXPECT_EQ(apply_changelog.entryAt(i + 1)->get_term(), i * 10); + EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10); } EXPECT_EQ(apply_changelog.size(), 10); - apply_changelog.applyEntriesFromBuffer(8, *entries); + apply_changelog.apply_pack(8, *entries); EXPECT_EQ(apply_changelog.size(), 12); - EXPECT_EQ(apply_changelog.getStartIndex(), 1); - EXPECT_EQ(apply_changelog.getNextEntryIndex(), 13); + EXPECT_EQ(apply_changelog.start_index(), 1); + EXPECT_EQ(apply_changelog.next_slot(), 13); for (size_t i = 0; i < 7; ++i) { - EXPECT_EQ(apply_changelog.entryAt(i + 1)->get_term(), i * 10); + EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10); } - EXPECT_EQ(apply_changelog.entryAt(8)->get_term(), 0); - EXPECT_EQ(apply_changelog.entryAt(9)->get_term(), 10); - EXPECT_EQ(apply_changelog.entryAt(10)->get_term(), 20); - EXPECT_EQ(apply_changelog.entryAt(11)->get_term(), 30); - EXPECT_EQ(apply_changelog.entryAt(12)->get_term(), 40); + EXPECT_EQ(apply_changelog.entry_at(8)->get_term(), 0); + EXPECT_EQ(apply_changelog.entry_at(9)->get_term(), 10); + EXPECT_EQ(apply_changelog.entry_at(10)->get_term(), 20); + EXPECT_EQ(apply_changelog.entry_at(11)->get_term(), 30); + EXPECT_EQ(apply_changelog.entry_at(12)->get_term(), 40); } TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 100); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 100); + changelog.init(1); for (size_t i = 0; i < 10; ++i) { auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_EQ(changelog.size(), 10); - auto entries = changelog.serializeEntriesToBuffer(5, 5); + auto entries = changelog.pack(5, 5); ChangelogDirTest test1("./logs1"); - DB::Changelog changelog_new("./logs1", 100); - changelog_new.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_new("./logs1", 100); + changelog_new.init(1); EXPECT_EQ(changelog_new.size(), 0); - changelog_new.applyEntriesFromBuffer(5, *entries); + changelog_new.apply_pack(5, *entries); EXPECT_EQ(changelog_new.size(), 5); - EXPECT_EQ(changelog_new.getStartIndex(), 5); - EXPECT_EQ(changelog_new.getNextEntryIndex(), 10); + EXPECT_EQ(changelog_new.start_index(), 5); + EXPECT_EQ(changelog_new.next_slot(), 10); for (size_t i = 4; i < 9; ++i) - EXPECT_EQ(changelog_new.entryAt(i + 1)->get_term(), i * 10); + EXPECT_EQ(changelog_new.entry_at(i + 1)->get_term(), i * 10); - changelog_new.appendEntry(changelog_new.getNextEntryIndex(), getLogEntry("hello_world", 110)); + auto e = getLogEntry("hello_world", 110); + changelog_new.append(e); EXPECT_EQ(changelog_new.size(), 6); - EXPECT_EQ(changelog_new.getStartIndex(), 5); - EXPECT_EQ(changelog_new.getNextEntryIndex(), 11); + EXPECT_EQ(changelog_new.start_index(), 5); + EXPECT_EQ(changelog_new.next_slot(), 11); - DB::Changelog changelog_reader("./logs1", 100); - changelog_reader.readChangelogAndInitWriter(5); + DB::NuKeeperLogStore changelog_reader("./logs1", 100); + changelog_reader.init(5); } TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); for (size_t i = 0; i < 33; ++i) { auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); @@ -657,11 +662,12 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) EXPECT_EQ(changelog.size(), 33); - changelog.writeAt(7, getLogEntry("helloworld", 5555)); + auto e1 = getLogEntry("helloworld", 5555); + changelog.write_at(7, e1); EXPECT_EQ(changelog.size(), 7); - EXPECT_EQ(changelog.getStartIndex(), 1); - EXPECT_EQ(changelog.getNextEntryIndex(), 8); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 5555); + EXPECT_EQ(changelog.start_index(), 1); + EXPECT_EQ(changelog.next_slot(), 8); + EXPECT_EQ(changelog.last_entry()->get_term(), 5555); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); @@ -672,24 +678,24 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); - DB::Changelog changelog_read("./logs", 5); - changelog_read.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_read("./logs", 5); + changelog_read.init(1); EXPECT_EQ(changelog_read.size(), 7); - EXPECT_EQ(changelog_read.getStartIndex(), 1); - EXPECT_EQ(changelog_read.getNextEntryIndex(), 8); - EXPECT_EQ(changelog_read.getLastEntry()->get_term(), 5555); + EXPECT_EQ(changelog_read.start_index(), 1); + EXPECT_EQ(changelog_read.next_slot(), 8); + EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555); } TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) { ChangelogDirTest test("./logs"); - DB::Changelog changelog("./logs", 5); - changelog.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); for (size_t i = 0; i < 33; ++i) { auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); - changelog.appendEntry(changelog.getNextEntryIndex(), entry); + changelog.append(entry); } EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); @@ -702,11 +708,12 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) EXPECT_EQ(changelog.size(), 33); - changelog.writeAt(11, getLogEntry("helloworld", 5555)); + auto e1 = getLogEntry("helloworld", 5555); + changelog.write_at(11, e1); EXPECT_EQ(changelog.size(), 11); - EXPECT_EQ(changelog.getStartIndex(), 1); - EXPECT_EQ(changelog.getNextEntryIndex(), 12); - EXPECT_EQ(changelog.getLastEntry()->get_term(), 5555); + EXPECT_EQ(changelog.start_index(), 1); + EXPECT_EQ(changelog.next_slot(), 12); + EXPECT_EQ(changelog.last_entry()->get_term(), 5555); EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); @@ -717,12 +724,90 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); - DB::Changelog changelog_read("./logs", 5); - changelog_read.readChangelogAndInitWriter(1); + DB::NuKeeperLogStore changelog_read("./logs", 5); + changelog_read.init(1); EXPECT_EQ(changelog_read.size(), 11); - EXPECT_EQ(changelog_read.getStartIndex(), 1); - EXPECT_EQ(changelog_read.getNextEntryIndex(), 12); - EXPECT_EQ(changelog_read.getLastEntry()->get_term(), 5555); + EXPECT_EQ(changelog_read.start_index(), 1); + EXPECT_EQ(changelog_read.next_slot(), 12); + EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555); +} + +TEST(CoordinationTest, ChangelogTestWriteAtAllFiles) +{ + ChangelogDirTest test("./logs"); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); + + for (size_t i = 0; i < 33; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); + changelog.append(entry); + } + + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin")); + + EXPECT_EQ(changelog.size(), 33); + + auto e1 = getLogEntry("helloworld", 5555); + changelog.write_at(1, e1); + EXPECT_EQ(changelog.size(), 1); + EXPECT_EQ(changelog.start_index(), 1); + EXPECT_EQ(changelog.next_slot(), 2); + EXPECT_EQ(changelog.last_entry()->get_term(), 5555); + + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + + EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); +} + +TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead) +{ + ChangelogDirTest test("./logs"); + DB::NuKeeperLogStore changelog("./logs", 5); + changelog.init(1); + + for (size_t i = 0; i < 35; ++i) + { + auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10); + changelog.append(entry); + } + EXPECT_EQ(changelog.size(), 35); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin")); + EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin")); + + + DB::NuKeeperLogStore changelog_reader("./logs", 5); + changelog_reader.init(1); + + auto entry = getLogEntry("36_hello_world", 360); + changelog_reader.append(entry); + + EXPECT_EQ(changelog_reader.size(), 36); + EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin")); + EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin")); } #endif