Better tests and ability to work without compression

This commit is contained in:
alesapin 2021-09-22 13:38:06 +03:00
parent 3a11cc59b0
commit f980a414ee
4 changed files with 271 additions and 214 deletions

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
extern const int CORRUPTED_DATA;
extern const int UNSUPPORTED_METHOD;
extern const int UNKNOWN_FORMAT_VERSION;
extern const int LOGICAL_ERROR;
}
@ -79,30 +80,45 @@ public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, uint64_t start_index_)
: filepath(filepath_)
, file_buf(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY))
, buf(file_buf, /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append)
, start_index(start_index_)
{
auto compression_method = chooseCompressionMethod(filepath_, "");
if (compression_method != CompressionMethod::Zstd && compression_method != CompressionMethod::None)
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Unsupported coordination log serialization format {}", toContentEncodingName(compression_method));
}
else if (compression_method == CompressionMethod::Zstd)
{
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(file_buf, /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append);
}
else
{
/// no compression, only file buffer
}
}
void appendRecord(ChangelogRecord && record)
{
writeIntBinary(computeRecordChecksum(record), buf);
writeIntBinary(computeRecordChecksum(record), getBuffer());
writeIntBinary(record.header.version, buf);
writeIntBinary(record.header.index, buf);
writeIntBinary(record.header.term, buf);
writeIntBinary(record.header.value_type, buf);
writeIntBinary(record.header.blob_size, buf);
writeIntBinary(record.header.version, getBuffer());
writeIntBinary(record.header.index, getBuffer());
writeIntBinary(record.header.term, getBuffer());
writeIntBinary(record.header.value_type, getBuffer());
writeIntBinary(record.header.blob_size, getBuffer());
if (record.header.blob_size != 0)
buf.write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
getBuffer().write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
}
void flush(bool force_fsync)
{
/// Flush compressed data to WriteBufferFromFile working_buffer
buf.next();
if (compressed_buffer)
{
/// Flush compressed data to WriteBufferFromFile working_buffer
compressed_buffer->next();
}
/// Flush working buffer to file system
file_buf.next();
@ -118,9 +134,16 @@ public:
}
private:
WriteBuffer & getBuffer()
{
if (compressed_buffer)
return *compressed_buffer;
return file_buf;
}
std::string filepath;
WriteBufferFromFile file_buf;
ZstdDeflatingAppendableWriteBuffer buf;
std::unique_ptr<WriteBuffer> compressed_buffer;
uint64_t start_index;
};

View File

@ -13,7 +13,7 @@ namespace DB
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_, bool compress_logs_ = true);
KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_, bool compress_logs_);
/// Read log storage from filesystem starting from last_commited_log_index
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);

View File

@ -34,7 +34,7 @@ KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host,
: my_server_id(server_id_)
, my_port(port)
, secure(false)
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false))
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false, false))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));

View File

@ -53,14 +53,23 @@ struct ChangelogDirTest
}
};
TEST(CoordinationTest, BuildTest)
struct CompressionParam
{
bool enable_compression;
std::string extension;
};
class CoordinationTest : public ::testing::TestWithParam<CompressionParam>
{};
TEST_P(CoordinationTest, BuildTest)
{
DB::InMemoryLogStore store;
DB::SummingStateMachine machine;
EXPECT_EQ(1, 1);
}
TEST(CoordinationTest, BufferSerde)
TEST_P(CoordinationTest, BufferSerde)
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Get);
request->xid = 3;
@ -173,7 +182,7 @@ nuraft::ptr<nuraft::buffer> getBuffer(int64_t number)
}
TEST(CoordinationTest, TestSummingRaft1)
TEST_P(CoordinationTest, TestSummingRaft1)
{
ChangelogDirTest test("./logs");
SummingRaftServer s1(1, "localhost", 44444, "./logs");
@ -204,10 +213,11 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term)
return nuraft::cs_new<nuraft::log_entry>(term, bufwriter.getBuffer());
}
TEST(CoordinationTest, ChangelogTestSimple)
TEST_P(CoordinationTest, ChangelogTestSimple)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
@ -220,17 +230,19 @@ TEST(CoordinationTest, ChangelogTestSimple)
EXPECT_EQ(changelog.log_entries(1, 2)->size(), 1);
}
TEST(CoordinationTest, ChangelogTestFile)
TEST_P(CoordinationTest, ChangelogTestFile)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
for (const auto & p : fs::directory_iterator("./logs"))
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin.zstd");
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin" + params.extension);
changelog.append(entry);
changelog.append(entry);
@ -239,14 +251,15 @@ TEST(CoordinationTest, ChangelogTestFile)
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
}
TEST(CoordinationTest, ChangelogReadWrite)
TEST_P(CoordinationTest, ChangelogReadWrite)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 1000, true);
DB::KeeperLogStore changelog("./logs", 1000, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -257,7 +270,7 @@ TEST(CoordinationTest, ChangelogReadWrite)
EXPECT_EQ(changelog.size(), 10);
DB::KeeperLogStore changelog_reader("./logs", 1000, true);
DB::KeeperLogStore changelog_reader("./logs", 1000, true, params.enable_compression);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
@ -273,10 +286,11 @@ TEST(CoordinationTest, ChangelogReadWrite)
EXPECT_EQ(10, entries_from_range->size());
}
TEST(CoordinationTest, ChangelogWriteAt)
TEST_P(CoordinationTest, ChangelogWriteAt)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 1000, true);
DB::KeeperLogStore changelog("./logs", 1000, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -296,7 +310,7 @@ TEST(CoordinationTest, ChangelogWriteAt)
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
EXPECT_EQ(changelog.next_slot(), 8);
DB::KeeperLogStore changelog_reader("./logs", 1000, true);
DB::KeeperLogStore changelog_reader("./logs", 1000, true, params.enable_compression);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), changelog.size());
@ -306,10 +320,11 @@ TEST(CoordinationTest, ChangelogWriteAt)
}
TEST(CoordinationTest, ChangelogTestAppendAfterRead)
TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 7; ++i)
{
@ -319,10 +334,10 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 7);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
DB::KeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true, params.enable_compression);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 7);
@ -333,8 +348,8 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
}
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
size_t logs_count = 0;
for (const auto & _ [[maybe_unused]]: fs::directory_iterator("./logs"))
@ -346,9 +361,9 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
logs_count = 0;
for (const auto & _ [[maybe_unused]]: fs::directory_iterator("./logs"))
@ -357,10 +372,11 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
EXPECT_EQ(logs_count, 3);
}
TEST(CoordinationTest, ChangelogTestCompaction)
TEST_P(CoordinationTest, ChangelogTestCompaction)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 3; ++i)
@ -378,7 +394,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog.start_index(), 3);
EXPECT_EQ(changelog.next_slot(), 4);
EXPECT_EQ(changelog.last_entry()->get_term(), 20);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
auto e1 = getLogEntry("hello world", 30);
changelog.append(e1);
@ -390,20 +406,20 @@ TEST(CoordinationTest, ChangelogTestCompaction)
changelog.append(e4);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
changelog.compact(6);
EXPECT_FALSE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_EQ(changelog.size(), 1);
EXPECT_EQ(changelog.start_index(), 7);
EXPECT_EQ(changelog.next_slot(), 8);
EXPECT_EQ(changelog.last_entry()->get_term(), 60);
/// And we able to read it
DB::KeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true, params.enable_compression);
changelog_reader.init(7, 0);
EXPECT_EQ(changelog_reader.size(), 1);
@ -412,10 +428,11 @@ TEST(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 60);
}
TEST(CoordinationTest, ChangelogTestBatchOperations)
TEST_P(CoordinationTest, ChangelogTestBatchOperations)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -428,7 +445,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
auto entries = changelog.pack(1, 5);
DB::KeeperLogStore apply_changelog("./logs", 100, true);
DB::KeeperLogStore apply_changelog("./logs", 100, true, params.enable_compression);
apply_changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
@ -456,10 +473,11 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
EXPECT_EQ(apply_changelog.entry_at(12)->get_term(), 40);
}
TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -473,7 +491,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
auto entries = changelog.pack(5, 5);
ChangelogDirTest test1("./logs1");
DB::KeeperLogStore changelog_new("./logs1", 100, true);
DB::KeeperLogStore changelog_new("./logs1", 100, true, params.enable_compression);
changelog_new.init(1, 0);
EXPECT_EQ(changelog_new.size(), 0);
@ -495,15 +513,16 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
EXPECT_EQ(changelog_new.start_index(), 5);
EXPECT_EQ(changelog_new.next_slot(), 11);
DB::KeeperLogStore changelog_reader("./logs1", 100, true);
DB::KeeperLogStore changelog_reader("./logs1", 100, true, params.enable_compression);
changelog_reader.init(5, 0);
}
TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -513,13 +532,13 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
EXPECT_EQ(changelog.size(), 33);
@ -531,16 +550,16 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
EXPECT_EQ(changelog.next_slot(), 8);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::KeeperLogStore changelog_read("./logs", 5, true);
DB::KeeperLogStore changelog_read("./logs", 5, true, params.enable_compression);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 7);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -548,10 +567,11 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
}
TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -561,13 +581,13 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
EXPECT_EQ(changelog.size(), 33);
@ -579,16 +599,16 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
EXPECT_EQ(changelog.next_slot(), 12);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::KeeperLogStore changelog_read("./logs", 5, true);
DB::KeeperLogStore changelog_read("./logs", 5, true, params.enable_compression);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 11);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -596,10 +616,11 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
}
TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -609,13 +630,13 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
EXPECT_EQ(changelog.size(), 33);
@ -627,20 +648,21 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
EXPECT_EQ(changelog.next_slot(), 2);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
}
TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -650,17 +672,17 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin" + params.extension));
DB::KeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true, params.enable_compression);
changelog_reader.init(1, 0);
auto entry = getLogEntry("36_hello_world", 360);
@ -668,22 +690,23 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 36);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin" + params.extension));
}
TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -693,32 +716,32 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin.zstd", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
DB::KeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true, params.enable_compression);
changelog_reader.init(1, 0);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
auto entry = getLogEntry("h", 7777);
changelog_reader.append(entry);
@ -726,26 +749,27 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::KeeperLogStore changelog_reader2("./logs", 5, true);
DB::KeeperLogStore changelog_reader2("./logs", 5, true, params.enable_compression);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 11);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 20, true);
DB::KeeperLogStore changelog("./logs", 20, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -755,18 +779,18 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin.zstd", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(140);
DB::KeeperLogStore changelog_reader("./logs", 20, true);
DB::KeeperLogStore changelog_reader("./logs", 20, true, params.enable_compression);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
@ -774,17 +798,18 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
DB::KeeperLogStore changelog_reader2("./logs", 1, true);
DB::KeeperLogStore changelog_reader2("./logs", 1, true, params.enable_compression);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 1);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
TEST(CoordinationTest, ChangelogTestLostFiles)
TEST_P(CoordinationTest, ChangelogTestLostFiles)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
DB::KeeperLogStore changelog("./logs", 20, true);
DB::KeeperLogStore changelog("./logs", 20, true, params.enable_compression);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -794,19 +819,19 @@ TEST(CoordinationTest, ChangelogTestLostFiles)
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
fs::remove("./logs/changelog_1_20.bin.zstd");
fs::remove("./logs/changelog_1_20.bin" + params.extension);
DB::KeeperLogStore changelog_reader("./logs", 20, true);
DB::KeeperLogStore changelog_reader("./logs", 20, true, params.enable_compression);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
}
TEST(CoordinationTest, SnapshotableHashMapSimple)
TEST_P(CoordinationTest, SnapshotableHashMapSimple)
{
DB::SnapshotableHashTable<int> hello;
EXPECT_TRUE(hello.insert("hello", 5));
@ -821,7 +846,7 @@ TEST(CoordinationTest, SnapshotableHashMapSimple)
EXPECT_EQ(hello.size(), 0);
}
TEST(CoordinationTest, SnapshotableHashMapTrySnapshot)
TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot)
{
DB::SnapshotableHashTable<int> map_snp;
EXPECT_TRUE(map_snp.insert("/hello", 7));
@ -907,7 +932,7 @@ void addNode(DB::KeeperStorage & storage, const std::string & path, const std::s
storage.container.insertOrReplace(path, node);
}
TEST(CoordinationTest, TestStorageSnapshotSimple)
TEST_P(CoordinationTest, TestStorageSnapshotSimple)
{
ChangelogDirTest test("./snapshots");
DB::KeeperSnapshotManager manager("./snapshots", 3);
@ -954,7 +979,7 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites)
{
ChangelogDirTest test("./snapshots");
DB::KeeperSnapshotManager manager("./snapshots", 3);
@ -994,7 +1019,7 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
}
TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots)
{
ChangelogDirTest test("./snapshots");
DB::KeeperSnapshotManager manager("./snapshots", 3);
@ -1032,7 +1057,7 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
}
}
TEST(CoordinationTest, TestStorageSnapshotMode)
TEST_P(CoordinationTest, TestStorageSnapshotMode)
{
ChangelogDirTest test("./snapshots");
DB::KeeperSnapshotManager manager("./snapshots", 3);
@ -1083,7 +1108,7 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
}
TEST(CoordinationTest, TestStorageSnapshotBroken)
TEST_P(CoordinationTest, TestStorageSnapshotBroken)
{
ChangelogDirTest test("./snapshots");
DB::KeeperSnapshotManager manager("./snapshots", 3);
@ -1121,7 +1146,7 @@ nuraft::ptr<nuraft::log_entry> getLogEntryFromZKRequest(size_t term, int64_t ses
return nuraft::cs_new<nuraft::log_entry>(term, buffer);
}
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint64_t total_logs)
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint64_t total_logs, bool enable_compression)
{
using namespace Coordination;
using namespace DB;
@ -1133,7 +1158,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true);
DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true, enable_compression);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
for (size_t i = 1; i < total_logs + 1; ++i)
{
@ -1173,7 +1198,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);
DB::KeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true);
DB::KeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true, enable_compression);
restore_changelog.init(restore_machine->last_commit_index() + 1, settings->reserved_log_items);
EXPECT_EQ(restore_changelog.size(), std::min(settings->reserved_log_items + total_logs % settings->snapshot_distance, total_logs));
@ -1199,77 +1224,78 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
}
}
TEST(CoordinationTest, TestStateMachineAndLogStore)
TEST_P(CoordinationTest, TestStateMachineAndLogStore)
{
using namespace Coordination;
using namespace DB;
auto params = GetParam();
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 37);
testLogAndStateMachine(settings, 37, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 11);
testLogAndStateMachine(settings, 11, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 10;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 40);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 20;
settings->rotate_log_storage_interval = 30;
testLogAndStateMachine(settings, 40);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 0;
settings->rotate_log_storage_interval = 10;
testLogAndStateMachine(settings, 40);
testLogAndStateMachine(settings, 40, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 1;
settings->reserved_log_items = 1;
settings->rotate_log_storage_interval = 32;
testLogAndStateMachine(settings, 32);
testLogAndStateMachine(settings, 32, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 10;
settings->reserved_log_items = 7;
settings->rotate_log_storage_interval = 1;
testLogAndStateMachine(settings, 33);
testLogAndStateMachine(settings, 33, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 37;
settings->reserved_log_items = 1000;
settings->rotate_log_storage_interval = 5000;
testLogAndStateMachine(settings, 33);
testLogAndStateMachine(settings, 33, params.enable_compression);
}
{
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
settings->snapshot_distance = 37;
settings->reserved_log_items = 1000;
settings->rotate_log_storage_interval = 5000;
testLogAndStateMachine(settings, 45);
testLogAndStateMachine(settings, 45, params.enable_compression);
}
}
TEST(CoordinationTest, TestEphemeralNodeRemove)
TEST_P(CoordinationTest, TestEphemeralNodeRemove)
{
using namespace Coordination;
using namespace DB;
@ -1300,12 +1326,13 @@ TEST(CoordinationTest, TestEphemeralNodeRemove)
}
TEST(CoordinationTest, TestRotateIntervalChanges)
TEST_P(CoordinationTest, TestRotateIntervalChanges)
{
using namespace Coordination;
auto params = GetParam();
ChangelogDirTest snapshots("./logs");
{
DB::KeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
@ -1319,9 +1346,9 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
DB::KeeperLogStore changelog_1("./logs", 10, true);
DB::KeeperLogStore changelog_1("./logs", 10, true, params.enable_compression);
changelog_1.init(0, 50);
for (size_t i = 0; i < 55; ++i)
{
@ -1332,10 +1359,10 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
changelog_1.end_of_append_batch(0, 0);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
DB::KeeperLogStore changelog_2("./logs", 7, true);
DB::KeeperLogStore changelog_2("./logs", 7, true, params.enable_compression);
changelog_2.init(98, 55);
for (size_t i = 0; i < 17; ++i)
@ -1349,13 +1376,13 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
changelog_2.compact(105);
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_111_117.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_118_124.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension));
DB::KeeperLogStore changelog_3("./logs", 5, true);
DB::KeeperLogStore changelog_3("./logs", 5, true, params.enable_compression);
changelog_3.init(116, 3);
for (size_t i = 0; i < 17; ++i)
{
@ -1367,17 +1394,17 @@ TEST(CoordinationTest, TestRotateIntervalChanges)
}
changelog_3.compact(125);
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_111_117.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_118_124.bin.zstd"));
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_111_117.bin" + params.extension));
EXPECT_FALSE(fs::exists("./logs/changelog_118_124.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_137_141.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin.zstd"));
EXPECT_TRUE(fs::exists("./logs/changelog_125_131.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_132_136.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_137_141.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_142_146.bin" + params.extension));
}
TEST(CoordinationTest, TestSessionExpiryQueue)
TEST_P(CoordinationTest, TestSessionExpiryQueue)
{
using namespace Coordination;
SessionExpiryQueue queue(500);
@ -1395,11 +1422,12 @@ TEST(CoordinationTest, TestSessionExpiryQueue)
}
TEST(CoordinationTest, TestCompressedLogsMultipleRewrite)
TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
{
using namespace Coordination;
auto test_params = GetParam();
ChangelogDirTest snapshots("./logs");
DB::KeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true, test_params.enable_compression);
changelog.init(0, 3);
for (size_t i = 1; i < 55; ++i)
@ -1412,7 +1440,7 @@ TEST(CoordinationTest, TestCompressedLogsMultipleRewrite)
}
DB::KeeperLogStore changelog1("./logs", 100, true);
DB::KeeperLogStore changelog1("./logs", 100, true, test_params.enable_compression);
changelog1.init(0, 3);
for (size_t i = 55; i < 70; ++i)
{
@ -1423,7 +1451,7 @@ TEST(CoordinationTest, TestCompressedLogsMultipleRewrite)
changelog1.end_of_append_batch(0, 0);
}
DB::KeeperLogStore changelog2("./logs", 100, true);
DB::KeeperLogStore changelog2("./logs", 100, true, test_params.enable_compression);
changelog2.init(0, 3);
for (size_t i = 70; i < 80; ++i)
{
@ -1434,9 +1462,15 @@ TEST(CoordinationTest, TestCompressedLogsMultipleRewrite)
changelog2.end_of_append_batch(0, 0);
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{
CompressionParam{true, ".zstd"},
CompressionParam{false, ""}
})
);
int main(int argc, char ** argv)
{