mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #43450 from ClickHouse/parallel-log-appending
Use parallel log appending in Keeper
This commit is contained in:
commit
b1d11ce0e1
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit e4e746a24eb56861a86f3672771e3308d8c40722
|
||||
Subproject commit afc36dfa9b0beb45bc4cd935060631cc80ba04a5
|
@ -117,7 +117,7 @@ public:
|
||||
|
||||
WriteBuffer * working_buf = compressed_buffer ? compressed_buffer->getNestedBuffer() : file_buf.get();
|
||||
|
||||
/// Flush working buffer to file system
|
||||
/// Flush working buffer to file system
|
||||
working_buf->next();
|
||||
|
||||
/// Fsync file system if needed
|
||||
@ -280,6 +280,7 @@ Changelog::Changelog(
|
||||
, force_sync(force_sync_)
|
||||
, log(log_)
|
||||
, compress_logs(compress_logs_)
|
||||
, write_operations(std::numeric_limits<size_t>::max())
|
||||
{
|
||||
/// Load all files in changelog directory
|
||||
namespace fs = std::filesystem;
|
||||
@ -299,10 +300,13 @@ Changelog::Changelog(
|
||||
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir.generic_string());
|
||||
|
||||
clean_log_thread = ThreadFromGlobalPool([this] { cleanLogThread(); });
|
||||
|
||||
write_thread = ThreadFromGlobalPool([this] { writeThread(); });
|
||||
}
|
||||
|
||||
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
|
||||
{
|
||||
std::lock_guard writer_lock(writer_mutex);
|
||||
std::optional<ChangelogReadResult> last_log_read_result;
|
||||
|
||||
/// Last log has some free space to write
|
||||
@ -336,7 +340,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
|
||||
removeAllLogs();
|
||||
min_log_id = last_commited_log_index;
|
||||
max_log_id = last_commited_log_index == 0 ? 0 : last_commited_log_index - 1;
|
||||
rotate(max_log_id + 1);
|
||||
rotate(max_log_id + 1, writer_lock);
|
||||
return;
|
||||
}
|
||||
else if (changelog_description.from_log_index > start_to_read_from)
|
||||
@ -427,7 +431,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
|
||||
|
||||
/// Start new log if we don't initialize writer from previous log. All logs can be "complete".
|
||||
if (!current_writer)
|
||||
rotate(max_log_id + 1);
|
||||
rotate(max_log_id + 1, writer_lock);
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
|
||||
@ -500,10 +506,11 @@ void Changelog::removeAllLogs()
|
||||
logs.clear();
|
||||
}
|
||||
|
||||
void Changelog::rotate(uint64_t new_start_log_index)
|
||||
void Changelog::rotate(uint64_t new_start_log_index, std::lock_guard<std::mutex> &)
|
||||
{
|
||||
/// Flush previous log
|
||||
flush();
|
||||
if (current_writer)
|
||||
current_writer->flush(force_sync);
|
||||
|
||||
/// Start new one
|
||||
ChangelogFileDescription new_description;
|
||||
@ -540,50 +547,96 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e
|
||||
return record;
|
||||
}
|
||||
|
||||
void Changelog::writeThread()
|
||||
{
|
||||
WriteOperation write_operation;
|
||||
while (write_operations.pop(write_operation))
|
||||
{
|
||||
assert(initialized);
|
||||
|
||||
if (auto * append_log = std::get_if<AppendLog>(&write_operation))
|
||||
{
|
||||
std::lock_guard writer_lock(writer_mutex);
|
||||
assert(current_writer);
|
||||
|
||||
const auto & current_changelog_description = existing_changelogs[current_writer->getStartIndex()];
|
||||
const bool log_is_complete = append_log->index - current_writer->getStartIndex() == current_changelog_description.expectedEntriesCountInLog();
|
||||
|
||||
if (log_is_complete)
|
||||
rotate(append_log->index, writer_lock);
|
||||
|
||||
current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & flush = std::get<Flush>(write_operation);
|
||||
|
||||
{
|
||||
std::lock_guard writer_lock(writer_mutex);
|
||||
if (current_writer)
|
||||
current_writer->flush(force_sync);
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock{durable_idx_mutex};
|
||||
last_durable_idx = flush.index;
|
||||
}
|
||||
|
||||
durable_idx_cv.notify_all();
|
||||
|
||||
// we shouldn't start the raft_server before sending it here
|
||||
if (auto raft_server_locked = raft_server.lock())
|
||||
raft_server_locked->notify_log_append_completion(true);
|
||||
else
|
||||
LOG_WARNING(log, "Raft server is not set in LogStore.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
|
||||
{
|
||||
if (!current_writer)
|
||||
if (!initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
|
||||
|
||||
if (logs.empty())
|
||||
min_log_id = index;
|
||||
|
||||
const auto & current_changelog_description = existing_changelogs[current_writer->getStartIndex()];
|
||||
const bool log_is_complete = index - current_writer->getStartIndex() == current_changelog_description.expectedEntriesCountInLog();
|
||||
|
||||
if (log_is_complete)
|
||||
rotate(index);
|
||||
|
||||
current_writer->appendRecord(buildRecord(index, log_entry));
|
||||
logs[index] = log_entry;
|
||||
max_log_id = index;
|
||||
|
||||
if (!write_operations.tryPush(AppendLog{index, log_entry}))
|
||||
LOG_WARNING(log, "Changelog is shut down");
|
||||
}
|
||||
|
||||
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
|
||||
{
|
||||
/// This write_at require to overwrite everything in this file and also in previous file(s)
|
||||
const bool go_to_previous_file = index < current_writer->getStartIndex();
|
||||
|
||||
if (go_to_previous_file)
|
||||
{
|
||||
auto index_changelog = existing_changelogs.lower_bound(index);
|
||||
std::lock_guard lock(writer_mutex);
|
||||
/// This write_at require to overwrite everything in this file and also in previous file(s)
|
||||
const bool go_to_previous_file = index < current_writer->getStartIndex();
|
||||
|
||||
ChangelogFileDescription description;
|
||||
|
||||
if (index_changelog->first == index) /// exactly this file starts from index
|
||||
description = index_changelog->second;
|
||||
else
|
||||
description = std::prev(index_changelog)->second;
|
||||
|
||||
/// Initialize writer from this log file
|
||||
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first);
|
||||
|
||||
/// Remove all subsequent files if overwritten something in previous one
|
||||
auto to_remove_itr = existing_changelogs.upper_bound(index);
|
||||
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
|
||||
if (go_to_previous_file)
|
||||
{
|
||||
std::filesystem::remove(itr->second.path);
|
||||
itr = existing_changelogs.erase(itr);
|
||||
auto index_changelog = existing_changelogs.lower_bound(index);
|
||||
|
||||
ChangelogFileDescription description;
|
||||
|
||||
if (index_changelog->first == index) /// exactly this file starts from index
|
||||
description = index_changelog->second;
|
||||
else
|
||||
description = std::prev(index_changelog)->second;
|
||||
|
||||
/// Initialize writer from this log file
|
||||
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first);
|
||||
|
||||
/// Remove all subsequent files if overwritten something in previous one
|
||||
auto to_remove_itr = existing_changelogs.upper_bound(index);
|
||||
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
|
||||
{
|
||||
std::filesystem::remove(itr->second.path);
|
||||
itr = existing_changelogs.erase(itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,6 +650,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
|
||||
|
||||
void Changelog::compact(uint64_t up_to_log_index)
|
||||
{
|
||||
std::lock_guard lock(writer_mutex);
|
||||
LOG_INFO(log, "Compact logs up to log index {}, our max log id is {}", up_to_log_index, max_log_id);
|
||||
|
||||
bool remove_all_logs = false;
|
||||
@ -643,7 +697,7 @@ void Changelog::compact(uint64_t up_to_log_index)
|
||||
std::erase_if(logs, [up_to_log_index] (const auto & item) { return item.first <= up_to_log_index; });
|
||||
|
||||
if (need_rotate)
|
||||
rotate(up_to_log_index + 1);
|
||||
rotate(up_to_log_index + 1, lock);
|
||||
|
||||
LOG_INFO(log, "Compaction up to {} finished new min index {}, new max index {}", up_to_log_index, min_log_id, max_log_id);
|
||||
}
|
||||
@ -747,8 +801,19 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
|
||||
|
||||
void Changelog::flush()
|
||||
{
|
||||
if (current_writer)
|
||||
current_writer->flush(force_sync);
|
||||
if (flushAsync())
|
||||
{
|
||||
std::unique_lock lock{durable_idx_mutex};
|
||||
durable_idx_cv.wait(lock, [&] { return last_durable_idx == max_log_id; });
|
||||
}
|
||||
}
|
||||
|
||||
bool Changelog::flushAsync()
|
||||
{
|
||||
bool pushed = write_operations.push(Flush{max_log_id});
|
||||
if (!pushed)
|
||||
LOG_WARNING(log, "Changelog is shut down");
|
||||
return pushed;
|
||||
}
|
||||
|
||||
void Changelog::shutdown()
|
||||
@ -758,6 +823,12 @@ void Changelog::shutdown()
|
||||
|
||||
if (clean_log_thread.joinable())
|
||||
clean_log_thread.join();
|
||||
|
||||
if (!write_operations.isFinished())
|
||||
write_operations.finish();
|
||||
|
||||
if (write_thread.joinable())
|
||||
write_thread.join();
|
||||
}
|
||||
|
||||
Changelog::~Changelog()
|
||||
@ -789,4 +860,10 @@ void Changelog::cleanLogThread()
|
||||
}
|
||||
}
|
||||
|
||||
void Changelog::setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server_)
|
||||
{
|
||||
assert(raft_server_);
|
||||
raft_server = raft_server_;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,8 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
#include <libnuraft/raft_server.hxx>
|
||||
#include <city.h>
|
||||
#include <optional>
|
||||
#include <base/defines.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
@ -121,6 +123,8 @@ public:
|
||||
/// Fsync latest log to disk and flush buffer
|
||||
void flush();
|
||||
|
||||
bool flushAsync();
|
||||
|
||||
void shutdown();
|
||||
|
||||
uint64_t size() const
|
||||
@ -128,6 +132,14 @@ public:
|
||||
return logs.size();
|
||||
}
|
||||
|
||||
uint64_t lastDurableIndex() const
|
||||
{
|
||||
std::lock_guard lock{durable_idx_mutex};
|
||||
return last_durable_idx;
|
||||
}
|
||||
|
||||
void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server_);
|
||||
|
||||
/// Fsync log to disk
|
||||
~Changelog();
|
||||
|
||||
@ -136,7 +148,7 @@ private:
|
||||
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
|
||||
|
||||
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
|
||||
void rotate(uint64_t new_start_log_index);
|
||||
void rotate(uint64_t new_start_log_index, std::lock_guard<std::mutex> & writer_lock);
|
||||
|
||||
/// Currently existing changelogs
|
||||
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
|
||||
@ -162,7 +174,7 @@ private:
|
||||
Poco::Logger * log;
|
||||
bool compress_logs;
|
||||
|
||||
|
||||
std::mutex writer_mutex;
|
||||
/// Current writer for changelog file
|
||||
std::unique_ptr<ChangelogWriter> current_writer;
|
||||
/// Mapping log_id -> log_entry
|
||||
@ -175,6 +187,33 @@ private:
|
||||
/// 128 is enough, even if log is not removed, it's not a problem
|
||||
ConcurrentBoundedQueue<std::string> log_files_to_delete_queue{128};
|
||||
ThreadFromGlobalPool clean_log_thread;
|
||||
|
||||
struct AppendLog
|
||||
{
|
||||
uint64_t index;
|
||||
nuraft::ptr<nuraft::log_entry> log_entry;
|
||||
};
|
||||
|
||||
struct Flush
|
||||
{
|
||||
uint64_t index;
|
||||
};
|
||||
|
||||
using WriteOperation = std::variant<AppendLog, Flush>;
|
||||
|
||||
void writeThread();
|
||||
|
||||
ThreadFromGlobalPool write_thread;
|
||||
ConcurrentBoundedQueue<WriteOperation> write_operations;
|
||||
|
||||
// last_durable_index needs to be exposed through const getter so we make mutex mutable
|
||||
mutable std::mutex durable_idx_mutex;
|
||||
std::condition_variable durable_idx_cv;
|
||||
uint64_t last_durable_idx{0};
|
||||
|
||||
nuraft::wptr<nuraft::raft_server> raft_server;
|
||||
|
||||
bool initialized = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ uint64_t KeeperLogStore::size() const
|
||||
void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*count*/)
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
changelog.flush();
|
||||
changelog.flushAsync();
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::log_entry> KeeperLogStore::getLatestConfigChange() const
|
||||
@ -132,4 +132,16 @@ bool KeeperLogStore::flushChangelogAndShutdown()
|
||||
return true;
|
||||
}
|
||||
|
||||
uint64_t KeeperLogStore::last_durable_index()
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
return changelog.lastDurableIndex();
|
||||
}
|
||||
|
||||
void KeeperLogStore::setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server)
|
||||
{
|
||||
std::lock_guard lock(changelog_lock);
|
||||
return changelog.setRaftServer(raft_server);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -62,12 +62,16 @@ public:
|
||||
/// Current log storage size
|
||||
uint64_t size() const;
|
||||
|
||||
uint64_t last_durable_index() override;
|
||||
|
||||
/// Flush batch of appended entries
|
||||
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
|
||||
|
||||
/// Get entry with latest config in logstore
|
||||
nuraft::ptr<nuraft::log_entry> getLatestConfigChange() const;
|
||||
|
||||
void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server);
|
||||
|
||||
private:
|
||||
mutable std::mutex changelog_lock;
|
||||
Poco::Logger * log;
|
||||
|
@ -266,6 +266,7 @@ void KeeperServer::forceRecovery()
|
||||
void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
|
||||
{
|
||||
nuraft::raft_params params;
|
||||
params.parallel_log_appending_ = true;
|
||||
params.heart_beat_interval_
|
||||
= getValueOrMaxInt32AndLogWarning(coordination_settings->heart_beat_interval_ms.totalMilliseconds(), "heart_beat_interval_ms", log);
|
||||
params.election_timeout_lower_bound_ = getValueOrMaxInt32AndLogWarning(
|
||||
@ -352,6 +353,8 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
|
||||
if (!raft_instance)
|
||||
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
|
||||
|
||||
state_manager->getLogStore()->setRaftServer(raft_instance);
|
||||
|
||||
raft_instance->start_server(init_options.skip_initial_election_timeout_);
|
||||
|
||||
nuraft::ptr<nuraft::raft_server> casted_raft_server = raft_instance;
|
||||
@ -446,8 +449,8 @@ void KeeperServer::shutdownRaftServer()
|
||||
|
||||
void KeeperServer::shutdown()
|
||||
{
|
||||
state_manager->flushAndShutDownLogStore();
|
||||
shutdownRaftServer();
|
||||
state_manager->flushAndShutDownLogStore();
|
||||
state_machine->shutdownStorage();
|
||||
}
|
||||
|
||||
|
@ -67,6 +67,7 @@ class CoordinationTest : public ::testing::TestWithParam<CompressionParam>
|
||||
{
|
||||
protected:
|
||||
DB::KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>();
|
||||
Poco::Logger * log{&Poco::Logger::get("CoordinationTest")};
|
||||
};
|
||||
|
||||
TEST_P(CoordinationTest, BuildTest)
|
||||
@ -129,10 +130,13 @@ struct SimpliestRaftServer
|
||||
params.snapshot_distance_ = 1; /// forcefully send snapshots
|
||||
params.client_req_timeout_ = 3000;
|
||||
params.return_method_ = nuraft::raft_params::blocking;
|
||||
params.parallel_log_appending_ = true;
|
||||
|
||||
nuraft::raft_server::init_options opts;
|
||||
opts.start_server_in_constructor_ = false;
|
||||
raft_instance = launcher.init(
|
||||
state_machine, state_manager, nuraft::cs_new<DB::LoggerWrapper>("ToyRaftLogger", DB::LogsLevel::trace), port,
|
||||
nuraft::asio_service::options{}, params);
|
||||
nuraft::asio_service::options{}, params, opts);
|
||||
|
||||
if (!raft_instance)
|
||||
{
|
||||
@ -140,6 +144,10 @@ struct SimpliestRaftServer
|
||||
_exit(1);
|
||||
}
|
||||
|
||||
state_manager->getLogStore()->setRaftServer(raft_instance);
|
||||
|
||||
raft_instance->start_server(false);
|
||||
|
||||
std::cout << "init Raft instance " << server_id;
|
||||
for (size_t ii = 0; ii < 20; ++ii)
|
||||
{
|
||||
@ -207,7 +215,7 @@ TEST_P(CoordinationTest, TestSummingRaft1)
|
||||
|
||||
while (s1.state_machine->getValue() != 143)
|
||||
{
|
||||
std::cout << "Waiting s1 to apply entry\n";
|
||||
LOG_INFO(log, "Waiting s1 to apply entry");
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
@ -240,6 +248,15 @@ TEST_P(CoordinationTest, ChangelogTestSimple)
|
||||
EXPECT_EQ(changelog.log_entries(1, 2)->size(), 1);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
void waitDurableLogs(nuraft::log_store & log_store)
|
||||
{
|
||||
while (log_store.last_durable_index() != log_store.next_slot() - 1)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_P(CoordinationTest, ChangelogTestFile)
|
||||
{
|
||||
@ -250,6 +267,9 @@ TEST_P(CoordinationTest, ChangelogTestFile)
|
||||
auto entry = getLogEntry("hello world", 77);
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
for (const auto & p : fs::directory_iterator("./logs"))
|
||||
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin" + params.extension);
|
||||
@ -261,6 +281,8 @@ TEST_P(CoordinationTest, ChangelogTestFile)
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
}
|
||||
@ -271,6 +293,7 @@ TEST_P(CoordinationTest, ChangelogReadWrite)
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::KeeperLogStore changelog("./logs", 1000, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto entry = getLogEntry("hello world", i * 10);
|
||||
@ -280,6 +303,8 @@ TEST_P(CoordinationTest, ChangelogReadWrite)
|
||||
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
DB::KeeperLogStore changelog_reader("./logs", 1000, true, params.enable_compression);
|
||||
changelog_reader.init(1, 0);
|
||||
EXPECT_EQ(changelog_reader.size(), 10);
|
||||
@ -315,6 +340,8 @@ TEST_P(CoordinationTest, ChangelogWriteAt)
|
||||
changelog.write_at(7, entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_EQ(changelog.size(), 7);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
|
||||
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
|
||||
@ -344,6 +371,9 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
EXPECT_EQ(changelog.size(), 7);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
|
||||
@ -358,6 +388,8 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
}
|
||||
changelog_reader.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog_reader.size(), 10);
|
||||
|
||||
waitDurableLogs(changelog_reader);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
|
||||
@ -371,6 +403,8 @@ TEST_P(CoordinationTest, ChangelogTestAppendAfterRead)
|
||||
changelog_reader.append(entry);
|
||||
changelog_reader.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog_reader.size(), 11);
|
||||
|
||||
waitDurableLogs(changelog_reader);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -396,6 +430,8 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_EQ(changelog.size(), 3);
|
||||
|
||||
changelog.compact(2);
|
||||
@ -416,6 +452,8 @@ TEST_P(CoordinationTest, ChangelogTestCompaction)
|
||||
changelog.append(e4);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
|
||||
@ -454,6 +492,8 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperations)
|
||||
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
auto entries = changelog.pack(1, 5);
|
||||
|
||||
DB::KeeperLogStore apply_changelog("./logs", 100, true, params.enable_compression);
|
||||
@ -499,6 +539,8 @@ TEST_P(CoordinationTest, ChangelogTestBatchOperationsEmpty)
|
||||
|
||||
EXPECT_EQ(changelog.size(), 10);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
auto entries = changelog.pack(5, 5);
|
||||
|
||||
ChangelogDirTest test1("./logs1");
|
||||
@ -543,6 +585,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -561,6 +605,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtPreviousFile)
|
||||
EXPECT_EQ(changelog.next_slot(), 8);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
|
||||
@ -592,6 +638,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -610,6 +658,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtFileBorder)
|
||||
EXPECT_EQ(changelog.next_slot(), 12);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -633,7 +683,6 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles)
|
||||
ChangelogDirTest test("./logs");
|
||||
DB::KeeperLogStore changelog("./logs", 5, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
|
||||
for (size_t i = 0; i < 33; ++i)
|
||||
{
|
||||
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
|
||||
@ -641,6 +690,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -659,6 +710,8 @@ TEST_P(CoordinationTest, ChangelogTestWriteAtAllFiles)
|
||||
EXPECT_EQ(changelog.next_slot(), 2);
|
||||
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
@ -683,6 +736,8 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.size(), 35);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -692,7 +747,6 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin" + params.extension));
|
||||
|
||||
|
||||
DB::KeeperLogStore changelog_reader("./logs", 5, true, params.enable_compression);
|
||||
changelog_reader.init(1, 0);
|
||||
|
||||
@ -701,6 +755,8 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
|
||||
changelog_reader.end_of_append_batch(0, 0);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), 36);
|
||||
|
||||
waitDurableLogs(changelog_reader);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -746,6 +802,8 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.size(), 35);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -779,6 +837,8 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
|
||||
EXPECT_EQ(changelog_reader.size(), 11);
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
|
||||
|
||||
waitDurableLogs(changelog_reader);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
|
||||
@ -809,6 +869,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
|
||||
|
||||
@ -824,6 +885,9 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
|
||||
auto entry = getLogEntry("hello_world", 7777);
|
||||
changelog_reader.append(entry);
|
||||
changelog_reader.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog_reader);
|
||||
|
||||
EXPECT_EQ(changelog_reader.size(), 1);
|
||||
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
|
||||
|
||||
@ -848,6 +912,7 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
|
||||
|
||||
@ -874,6 +939,8 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles2)
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_21_30.bin" + params.extension));
|
||||
@ -1330,6 +1397,8 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
state_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
|
||||
state_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
bool snapshot_created = false;
|
||||
@ -1339,7 +1408,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
nuraft::async_result<bool>::handler_type when_done = [&snapshot_created] (bool & ret, nuraft::ptr<std::exception> &/*exception*/)
|
||||
{
|
||||
snapshot_created = ret;
|
||||
std::cerr << "Snapshot finished\n";
|
||||
LOG_INFO(&Poco::Logger::get("CoordinationTest"), "Snapshot finished");
|
||||
};
|
||||
|
||||
state_machine->create_snapshot(s, when_done);
|
||||
@ -1511,6 +1580,8 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
}
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
|
||||
|
||||
@ -1527,6 +1598,8 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
|
||||
changelog_1.end_of_append_batch(0, 0);
|
||||
}
|
||||
|
||||
waitDurableLogs(changelog_1);
|
||||
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_1_100.bin" + params.extension));
|
||||
EXPECT_TRUE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
|
||||
|
||||
@ -1542,6 +1615,8 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
|
||||
changelog_2.end_of_append_batch(0, 0);
|
||||
}
|
||||
|
||||
waitDurableLogs(changelog_2);
|
||||
|
||||
changelog_2.compact(105);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1000));
|
||||
|
||||
@ -1562,6 +1637,8 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
|
||||
changelog_3.end_of_append_batch(0, 0);
|
||||
}
|
||||
|
||||
waitDurableLogs(changelog_3);
|
||||
|
||||
changelog_3.compact(125);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1000));
|
||||
EXPECT_FALSE(fs::exists("./logs/changelog_101_110.bin" + params.extension));
|
||||
@ -1609,6 +1686,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
}
|
||||
|
||||
waitDurableLogs(changelog);
|
||||
|
||||
DB::KeeperLogStore changelog1("./logs", 100, true, test_params.enable_compression);
|
||||
changelog1.init(0, 3);
|
||||
@ -1683,43 +1761,47 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
|
||||
auto params = GetParam();
|
||||
ChangelogDirTest test("./logs");
|
||||
{
|
||||
std::cerr << "================First time=====================\n";
|
||||
LOG_INFO(log, "================First time=====================");
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.next_slot(), 2);
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
|
||||
{
|
||||
std::cerr << "================Second time=====================\n";
|
||||
LOG_INFO(log, "================Second time=====================");
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.next_slot(), 3);
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
|
||||
{
|
||||
std::cerr << "================Third time=====================\n";
|
||||
LOG_INFO(log, "================Third time=====================");
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.next_slot(), 4);
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
|
||||
{
|
||||
std::cerr << "================Fourth time=====================\n";
|
||||
LOG_INFO(log, "================Fourth time=====================");
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog.next_slot(), 5);
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1730,7 +1812,7 @@ TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
|
||||
ChangelogDirTest test("./logs");
|
||||
for (size_t i = 0; i < 36; ++i)
|
||||
{
|
||||
std::cerr << "================First time=====================\n";
|
||||
LOG_INFO(log, "================First time=====================");
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
changelog.init(1, 0);
|
||||
for (size_t j = 0; j < 7; ++j)
|
||||
@ -1739,6 +1821,7 @@ TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
|
||||
changelog.append(entry);
|
||||
}
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
waitDurableLogs(changelog);
|
||||
}
|
||||
|
||||
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
|
||||
@ -1750,37 +1833,49 @@ TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
|
||||
{
|
||||
auto params = GetParam();
|
||||
ChangelogDirTest test("./logs");
|
||||
std::cerr << "================First time=====================\n";
|
||||
DB::KeeperLogStore changelog1("./logs", 100, true, params.enable_compression);
|
||||
changelog1.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog1.append(entry);
|
||||
changelog1.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog1.next_slot(), 2);
|
||||
{
|
||||
LOG_INFO(log, "================First time=====================");
|
||||
DB::KeeperLogStore changelog1("./logs", 100, true, params.enable_compression);
|
||||
changelog1.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog1.append(entry);
|
||||
changelog1.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog1.next_slot(), 2);
|
||||
waitDurableLogs(changelog1);
|
||||
}
|
||||
|
||||
std::cerr << "================Second time=====================\n";
|
||||
DB::KeeperLogStore changelog2("./logs", 100, true, params.enable_compression);
|
||||
changelog2.init(1, 0);
|
||||
entry = getLogEntry("hello_world", 1000);
|
||||
changelog2.append(entry);
|
||||
changelog2.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog2.next_slot(), 3);
|
||||
{
|
||||
LOG_INFO(log, "================Second time=====================");
|
||||
DB::KeeperLogStore changelog2("./logs", 100, true, params.enable_compression);
|
||||
changelog2.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog2.append(entry);
|
||||
changelog2.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog2.next_slot(), 3);
|
||||
waitDurableLogs(changelog2);
|
||||
}
|
||||
|
||||
std::cerr << "================Third time=====================\n";
|
||||
DB::KeeperLogStore changelog3("./logs", 100, true, params.enable_compression);
|
||||
changelog3.init(1, 0);
|
||||
entry = getLogEntry("hello_world", 1000);
|
||||
changelog3.append(entry);
|
||||
changelog3.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog3.next_slot(), 4);
|
||||
{
|
||||
LOG_INFO(log, "================Third time=====================");
|
||||
DB::KeeperLogStore changelog3("./logs", 100, true, params.enable_compression);
|
||||
changelog3.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog3.append(entry);
|
||||
changelog3.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog3.next_slot(), 4);
|
||||
waitDurableLogs(changelog3);
|
||||
}
|
||||
|
||||
std::cerr << "================Fourth time=====================\n";
|
||||
DB::KeeperLogStore changelog4("./logs", 100, true, params.enable_compression);
|
||||
changelog4.init(1, 0);
|
||||
entry = getLogEntry("hello_world", 1000);
|
||||
changelog4.append(entry);
|
||||
changelog4.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog4.next_slot(), 5);
|
||||
{
|
||||
LOG_INFO(log, "================Fourth time=====================");
|
||||
DB::KeeperLogStore changelog4("./logs", 100, true, params.enable_compression);
|
||||
changelog4.init(1, 0);
|
||||
auto entry = getLogEntry("hello_world", 1000);
|
||||
changelog4.append(entry);
|
||||
changelog4.end_of_append_batch(0, 0);
|
||||
EXPECT_EQ(changelog4.next_slot(), 5);
|
||||
waitDurableLogs(changelog4);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(CoordinationTest, TestStorageSnapshotEqual)
|
||||
|
Loading…
Reference in New Issue
Block a user