Max snapshots on disk

This commit is contained in:
alesapin 2021-03-01 17:54:08 +03:00
parent 73d3c20554
commit b7b6fd7cb3
5 changed files with 24 additions and 8 deletions

View File

@ -30,6 +30,7 @@ struct Settings;
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -32,7 +32,7 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
: coordination_settings(coordination_settings_)
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, snapshot_manager(snapshots_path_)
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep)
, responses_queue(responses_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuKeeperStateMachine"))

View File

@ -156,8 +156,9 @@ NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot()
storage->disableSnapshotMode();
}
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_)
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_)
: snapshots_path(snapshots_path_)
, snapshots_to_keep(snapshots_to_keep_)
{
namespace fs = std::filesystem;
@ -169,6 +170,8 @@ NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_p
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path());
existing_snapshots[snapshot_up_to] = p.path();
}
removeOutdatedSnapshotsIfNeeded();
}
@ -182,6 +185,8 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe
copyData(reader, plain_buf);
plain_buf.sync();
existing_snapshots.emplace(up_to_log_idx, new_snapshot_path);
removeOutdatedSnapshotsIfNeeded();
return new_snapshot_path;
}
@ -223,6 +228,11 @@ size_t NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * stor
return log_id;
}
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
{
while (existing_snapshots.size() > snapshots_to_keep)
existing_snapshots.erase(existing_snapshots.begin());
}
}

View File

@ -37,7 +37,7 @@ public:
class NuKeeperSnapshotManager
{
public:
explicit NuKeeperSnapshotManager(const std::string & snapshots_path_);
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_);
size_t restoreFromLatestSnapshot(NuKeeperStorage * storage) const;
@ -48,7 +48,9 @@ public:
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
private:
void removeOutdatedSnapshotsIfNeeded();
const std::string snapshots_path;
const size_t snapshots_to_keep;
std::map<size_t, std::string> existing_snapshots;
};

View File

@ -895,16 +895,20 @@ TEST(CoordinationTest, SnapshotableHashMapTrySnapshot)
map_snp.disableSnapshotMode();
}
TEST(CoordinationTest, TestStorageSnapshotSimple)
void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0)
{
using Node = DB::NuKeeperStorage::Node;
storage.container.insert(path, Node{.data=data, .ephemeral_owner = ephemeral_owner});
}
TEST(CoordinationTest, TestStorageSnapshotSimple)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
storage.container.insert("/hello", Node{.data="world", .ephemeral_owner = 1});
storage.container.insert("/hello/somepath", Node{.data="somedata", .ephemeral_owner = 3});
addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3);
storage.session_id_counter = 5;
storage.zxid = 2;
storage.ephemerals[3] = {"/hello"};
@ -945,7 +949,6 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
EXPECT_EQ(restored_storage.session_and_timeout.size(), 2);
}
int main(int argc, char ** argv)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));