mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Test log storage instead of changelog
This commit is contained in:
parent
dfaa79b88e
commit
af95db2fcf
@ -212,6 +212,8 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval
|
||||
void Changelog::readChangelogAndInitWriter(size_t from_log_idx)
|
||||
{
|
||||
size_t read_from_last = 0;
|
||||
start_index = from_log_idx == 0 ? 1 : from_log_idx;
|
||||
size_t total_read = 0;
|
||||
for (const auto & [start_id, changelog_file] : existing_changelogs)
|
||||
{
|
||||
ChangelogName parsed_name = getChangelogName(changelog_file);
|
||||
@ -219,11 +221,10 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx)
|
||||
{
|
||||
ChangelogReader reader(changelog_file);
|
||||
read_from_last = reader.readChangelog(logs, from_log_idx, index_to_start_pos);
|
||||
total_read += read_from_last;
|
||||
}
|
||||
}
|
||||
|
||||
start_index = from_log_idx == 0 ? 1 : from_log_idx;
|
||||
|
||||
if (existing_changelogs.size() > 0 && read_from_last < rotate_interval)
|
||||
{
|
||||
auto str_name = existing_changelogs.rbegin()->second;
|
||||
@ -233,7 +234,7 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_idx)
|
||||
}
|
||||
else
|
||||
{
|
||||
rotate(start_index);
|
||||
rotate(start_index + total_read);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,4 +94,10 @@ void NuKeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
|
||||
changelog.applyEntriesFromBuffer(index, pack);
|
||||
}
|
||||
|
||||
size_t NuKeeperLogStore::size() const
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
return changelog.size();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -39,6 +39,8 @@ public:
|
||||
|
||||
bool flush() override;
|
||||
|
||||
size_t size() const;
|
||||
|
||||
private:
|
||||
mutable std::mutex changelog_lock;
|
||||
Changelog changelog;
|
||||
|
@ -114,10 +114,10 @@ struct SimpliestRaftServer
|
||||
|
||||
if (!raft_instance)
|
||||
{
|
||||
std::cerr << "Failed to initialize launcher (see the message "
|
||||
"in the log file)." << std::endl;
|
||||
std::cerr << "Failed to initialize launcher" << std::endl;
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
std::cout << "init Raft instance " << server_id;
|
||||
for (size_t ii = 0; ii < 20; ++ii)
|
||||
{
|
||||
@ -370,33 +370,33 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term)
|
||||
TEST(CoordinationTest, ChangelogTestSimple)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
auto entry = getLogEntry("hello world", 77);
|
||||
changelog.appendEntry(1, entry);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 2);
|
||||
EXPECT_EQ(changelog.getStartIndex(), 1);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 77);
|
||||
EXPECT_EQ(changelog.entryAt(1)->get_term(), 77);
|
||||
EXPECT_EQ(changelog.getLogEntriesBetween(1, 2)->size(), 1);
|
||||
changelog.append(entry);
|
||||
EXPECT_EQ(changelog.next_slot(), 2);
|
||||
EXPECT_EQ(changelog.start_index(), 1);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
|
||||
EXPECT_EQ(changelog.entry_at(1)->get_term(), 77);
|
||||
EXPECT_EQ(changelog.log_entries(1, 2)->size(), 1);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestFile)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
auto entry = getLogEntry("hello world", 77);
|
||||
changelog.appendEntry(1, entry);
|
||||
changelog.append(entry);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
for (const auto & p : fs::directory_iterator("./logs"))
|
||||
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin");
|
||||
|
||||
changelog.appendEntry(2, entry);
|
||||
changelog.appendEntry(3, entry);
|
||||
changelog.appendEntry(4, entry);
|
||||
changelog.appendEntry(5, entry);
|
||||
changelog.appendEntry(6, entry);
|
||||
changelog.append(entry);
|
||||
changelog.append(entry);
|
||||
changelog.append(entry);
|
||||
changelog.append(entry);
|
||||
changelog.append(entry);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
@ -405,26 +405,26 @@ TEST(CoordinationTest, ChangelogTestFile)
|
||||
TEST(CoordinationTest, ChangelogReadWrite)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 1000);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 1000);
|
||||
changelog.init(1);
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
DB::Changelog changelog_reader("./logs", 1000);
|
||||
changelog_reader.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 1000);
|
||||
changelog_reader.init(1);
|
||||
EXPECT_EQ(changelog_reader.size(), 10);
|
||||
EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), changelog.getLastEntry()->get_term());
|
||||
EXPECT_EQ(changelog_reader.getStartIndex(), changelog.getStartIndex());
|
||||
EXPECT_EQ(changelog_reader.getNextEntryIndex(), changelog.getNextEntryIndex());
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
|
||||
EXPECT_EQ(changelog_reader.start_index(), changelog.start_index());
|
||||
EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot());
|
||||
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
EXPECT_EQ(changelog_reader.entryAt(i + 1)->get_term(), changelog.entryAt(i + 1)->get_term());
|
||||
EXPECT_EQ(changelog_reader.entry_at(i + 1)->get_term(), changelog.entry_at(i + 1)->get_term());
|
||||
|
||||
auto entries_from_range_read = changelog_reader.getLogEntriesBetween(1, 11);
|
||||
auto entries_from_range = changelog.getLogEntriesBetween(1, 11);
|
||||
auto entries_from_range_read = changelog_reader.log_entries(1, 11);
|
||||
auto entries_from_range = changelog.log_entries(1, 11);
|
||||
EXPECT_EQ(entries_from_range_read->size(), entries_from_range->size());
|
||||
EXPECT_EQ(10, entries_from_range->size());
|
||||
}
|
||||
@ -432,55 +432,55 @@ TEST(CoordinationTest, ChangelogReadWrite)
|
||||
TEST(CoordinationTest, ChangelogWriteAt)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 1000);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 1000);
|
||||
changelog.init(1);
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
auto entry = getLogEntry("writer", 77);
|
||||
changelog.writeAt(7, entry);
|
||||
changelog.write_at(7, entry);
|
||||
EXPECT_EQ(changelog.size(), 7);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 77);
|
||||
EXPECT_EQ(changelog.entryAt(7)->get_term(), 77);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 8);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
|
||||
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
|
||||
EXPECT_EQ(changelog.next_slot(), 8);
|
||||
|
||||
DB::Changelog changelog_reader("./logs", 1000);
|
||||
changelog_reader.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 1000);
|
||||
changelog_reader.init(1);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), changelog.size());
|
||||
EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), changelog.getLastEntry()->get_term());
|
||||
EXPECT_EQ(changelog_reader.getStartIndex(), changelog.getStartIndex());
|
||||
EXPECT_EQ(changelog_reader.getNextEntryIndex(), changelog.getNextEntryIndex());
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
|
||||
EXPECT_EQ(changelog_reader.start_index(), changelog.start_index());
|
||||
EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot());
|
||||
}
|
||||
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
for (size_t i = 0; i < 7; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_EQ(changelog.size(), 7);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
|
||||
DB::Changelog changelog_reader("./logs", 5);
|
||||
changelog_reader.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 5);
|
||||
changelog_reader.init(1);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), 7);
|
||||
for (size_t i = 7; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
changelog_reader.appendEntry(changelog_reader.getNextEntryIndex(), entry);
|
||||
changelog_reader.append(entry);
|
||||
}
|
||||
EXPECT_EQ(changelog_reader.size(), 10);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
@ -493,7 +493,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
EXPECT_EQ(logs_count, 2);
|
||||
|
||||
auto entry = getLogEntry("someentry", 77);
|
||||
changelog_reader.appendEntry(changelog_reader.getNextEntryIndex(), entry);
|
||||
changelog_reader.append(entry);
|
||||
EXPECT_EQ(changelog_reader.size(), 11);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
@ -509,13 +509,13 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
TEST(CoordinationTest, ChangelogTestCompaction)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 3; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_EQ(changelog.size(), 3);
|
||||
@ -523,15 +523,19 @@ TEST(CoordinationTest, ChangelogTestCompaction)
|
||||
changelog.compact(2);
|
||||
|
||||
EXPECT_EQ(changelog.size(), 1);
|
||||
EXPECT_EQ(changelog.getStartIndex(), 3);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 4);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 20);
|
||||
EXPECT_EQ(changelog.start_index(), 3);
|
||||
EXPECT_EQ(changelog.next_slot(), 4);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 20);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 30));
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 40));
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 50));
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), getLogEntry("hello world", 60));
|
||||
auto e1 = getLogEntry("hello world", 30);
|
||||
changelog.append(e1);
|
||||
auto e2 = getLogEntry("hello world", 40);
|
||||
changelog.append(e2);
|
||||
auto e3 = getLogEntry("hello world", 50);
|
||||
changelog.append(e3);
|
||||
auto e4 = getLogEntry("hello world", 60);
|
||||
changelog.append(e4);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
@ -542,109 +546,110 @@ TEST(CoordinationTest, ChangelogTestCompaction)
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
|
||||
EXPECT_EQ(changelog.size(), 1);
|
||||
EXPECT_EQ(changelog.getStartIndex(), 7);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 8);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 60);
|
||||
EXPECT_EQ(changelog.start_index(), 7);
|
||||
EXPECT_EQ(changelog.next_slot(), 8);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 60);
|
||||
/// And we able to read it
|
||||
DB::Changelog changelog_reader("./logs", 5);
|
||||
changelog_reader.readChangelogAndInitWriter(7);
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 5);
|
||||
changelog_reader.init(7);
|
||||
EXPECT_EQ(changelog_reader.size(), 1);
|
||||
EXPECT_EQ(changelog_reader.getStartIndex(), 7);
|
||||
EXPECT_EQ(changelog_reader.getNextEntryIndex(), 8);
|
||||
EXPECT_EQ(changelog_reader.getLastEntry()->get_term(), 60);
|
||||
EXPECT_EQ(changelog_reader.start_index(), 7);
|
||||
EXPECT_EQ(changelog_reader.next_slot(), 8);
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 60);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestBatchOperations)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 100);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 100);
|
||||
changelog.init(1);
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
auto entries = changelog.serializeEntriesToBuffer(1, 5);
|
||||
auto entries = changelog.pack(1, 5);
|
||||
|
||||
DB::Changelog apply_changelog("./logs", 100);
|
||||
apply_changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore apply_changelog("./logs", 100);
|
||||
apply_changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
EXPECT_EQ(apply_changelog.entryAt(i + 1)->get_term(), i * 10);
|
||||
EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10);
|
||||
}
|
||||
EXPECT_EQ(apply_changelog.size(), 10);
|
||||
|
||||
apply_changelog.applyEntriesFromBuffer(8, *entries);
|
||||
apply_changelog.apply_pack(8, *entries);
|
||||
|
||||
EXPECT_EQ(apply_changelog.size(), 12);
|
||||
EXPECT_EQ(apply_changelog.getStartIndex(), 1);
|
||||
EXPECT_EQ(apply_changelog.getNextEntryIndex(), 13);
|
||||
EXPECT_EQ(apply_changelog.start_index(), 1);
|
||||
EXPECT_EQ(apply_changelog.next_slot(), 13);
|
||||
|
||||
for (size_t i = 0; i < 7; ++i)
|
||||
{
|
||||
EXPECT_EQ(apply_changelog.entryAt(i + 1)->get_term(), i * 10);
|
||||
EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10);
|
||||
}
|
||||
|
||||
EXPECT_EQ(apply_changelog.entryAt(8)->get_term(), 0);
|
||||
EXPECT_EQ(apply_changelog.entryAt(9)->get_term(), 10);
|
||||
EXPECT_EQ(apply_changelog.entryAt(10)->get_term(), 20);
|
||||
EXPECT_EQ(apply_changelog.entryAt(11)->get_term(), 30);
|
||||
EXPECT_EQ(apply_changelog.entryAt(12)->get_term(), 40);
|
||||
EXPECT_EQ(apply_changelog.entry_at(8)->get_term(), 0);
|
||||
EXPECT_EQ(apply_changelog.entry_at(9)->get_term(), 10);
|
||||
EXPECT_EQ(apply_changelog.entry_at(10)->get_term(), 20);
|
||||
EXPECT_EQ(apply_changelog.entry_at(11)->get_term(), 30);
|
||||
EXPECT_EQ(apply_changelog.entry_at(12)->get_term(), 40);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 100);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 100);
|
||||
changelog.init(1);
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
auto entries = changelog.serializeEntriesToBuffer(5, 5);
|
||||
auto entries = changelog.pack(5, 5);
|
||||
|
||||
ChangelogDirTest test1("./logs1");
|
||||
DB::Changelog changelog_new("./logs1", 100);
|
||||
changelog_new.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_new("./logs1", 100);
|
||||
changelog_new.init(1);
|
||||
EXPECT_EQ(changelog_new.size(), 0);
|
||||
|
||||
changelog_new.applyEntriesFromBuffer(5, *entries);
|
||||
changelog_new.apply_pack(5, *entries);
|
||||
|
||||
EXPECT_EQ(changelog_new.size(), 5);
|
||||
EXPECT_EQ(changelog_new.getStartIndex(), 5);
|
||||
EXPECT_EQ(changelog_new.getNextEntryIndex(), 10);
|
||||
EXPECT_EQ(changelog_new.start_index(), 5);
|
||||
EXPECT_EQ(changelog_new.next_slot(), 10);
|
||||
|
||||
for (size_t i = 4; i < 9; ++i)
|
||||
EXPECT_EQ(changelog_new.entryAt(i + 1)->get_term(), i * 10);
|
||||
EXPECT_EQ(changelog_new.entry_at(i + 1)->get_term(), i * 10);
|
||||
|
||||
changelog_new.appendEntry(changelog_new.getNextEntryIndex(), getLogEntry("hello_world", 110));
|
||||
auto e = getLogEntry("hello_world", 110);
|
||||
changelog_new.append(e);
|
||||
EXPECT_EQ(changelog_new.size(), 6);
|
||||
EXPECT_EQ(changelog_new.getStartIndex(), 5);
|
||||
EXPECT_EQ(changelog_new.getNextEntryIndex(), 11);
|
||||
EXPECT_EQ(changelog_new.start_index(), 5);
|
||||
EXPECT_EQ(changelog_new.next_slot(), 11);
|
||||
|
||||
DB::Changelog changelog_reader("./logs1", 100);
|
||||
changelog_reader.readChangelogAndInitWriter(5);
|
||||
DB::NuKeeperLogStore changelog_reader("./logs1", 100);
|
||||
changelog_reader.init(5);
|
||||
}
|
||||
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 33; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
@ -657,11 +662,12 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
|
||||
|
||||
EXPECT_EQ(changelog.size(), 33);
|
||||
|
||||
changelog.writeAt(7, getLogEntry("helloworld", 5555));
|
||||
auto e1 = getLogEntry("helloworld", 5555);
|
||||
changelog.write_at(7, e1);
|
||||
EXPECT_EQ(changelog.size(), 7);
|
||||
EXPECT_EQ(changelog.getStartIndex(), 1);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 8);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 5555);
|
||||
EXPECT_EQ(changelog.start_index(), 1);
|
||||
EXPECT_EQ(changelog.next_slot(), 8);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
@ -672,24 +678,24 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
|
||||
DB::Changelog changelog_read("./logs", 5);
|
||||
changelog_read.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_read("./logs", 5);
|
||||
changelog_read.init(1);
|
||||
EXPECT_EQ(changelog_read.size(), 7);
|
||||
EXPECT_EQ(changelog_read.getStartIndex(), 1);
|
||||
EXPECT_EQ(changelog_read.getNextEntryIndex(), 8);
|
||||
EXPECT_EQ(changelog_read.getLastEntry()->get_term(), 5555);
|
||||
EXPECT_EQ(changelog_read.start_index(), 1);
|
||||
EXPECT_EQ(changelog_read.next_slot(), 8);
|
||||
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::Changelog changelog("./logs", 5);
|
||||
changelog.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 33; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.appendEntry(changelog.getNextEntryIndex(), entry);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
@ -702,11 +708,12 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
|
||||
|
||||
EXPECT_EQ(changelog.size(), 33);
|
||||
|
||||
changelog.writeAt(11, getLogEntry("helloworld", 5555));
|
||||
auto e1 = getLogEntry("helloworld", 5555);
|
||||
changelog.write_at(11, e1);
|
||||
EXPECT_EQ(changelog.size(), 11);
|
||||
EXPECT_EQ(changelog.getStartIndex(), 1);
|
||||
EXPECT_EQ(changelog.getNextEntryIndex(), 12);
|
||||
EXPECT_EQ(changelog.getLastEntry()->get_term(), 5555);
|
||||
EXPECT_EQ(changelog.start_index(), 1);
|
||||
EXPECT_EQ(changelog.next_slot(), 12);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
@ -717,12 +724,90 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
|
||||
DB::Changelog changelog_read("./logs", 5);
|
||||
changelog_read.readChangelogAndInitWriter(1);
|
||||
DB::NuKeeperLogStore changelog_read("./logs", 5);
|
||||
changelog_read.init(1);
|
||||
EXPECT_EQ(changelog_read.size(), 11);
|
||||
EXPECT_EQ(changelog_read.getStartIndex(), 1);
|
||||
EXPECT_EQ(changelog_read.getNextEntryIndex(), 12);
|
||||
EXPECT_EQ(changelog_read.getLastEntry()->get_term(), 5555);
|
||||
EXPECT_EQ(changelog_read.start_index(), 1);
|
||||
EXPECT_EQ(changelog_read.next_slot(), 12);
|
||||
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 33; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.append(entry);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
|
||||
EXPECT_EQ(changelog.size(), 33);
|
||||
|
||||
auto e1 = getLogEntry("helloworld", 5555);
|
||||
changelog.write_at(1, e1);
|
||||
EXPECT_EQ(changelog.size(), 1);
|
||||
EXPECT_EQ(changelog.start_index(), 1);
|
||||
EXPECT_EQ(changelog.next_slot(), 2);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
|
||||
{
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::NuKeeperLogStore changelog("./logs", 5);
|
||||
changelog.init(1);
|
||||
|
||||
for (size_t i = 0; i < 35; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
changelog.append(entry);
|
||||
}
|
||||
EXPECT_EQ(changelog.size(), 35);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin"));
|
||||
|
||||
|
||||
DB::NuKeeperLogStore changelog_reader("./logs", 5);
|
||||
changelog_reader.init(1);
|
||||
|
||||
auto entry = getLogEntry("36_hello_world", 360);
|
||||
changelog_reader.append(entry);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), 36);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin"));
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user