replace snpd with lgif

This commit is contained in:
JackyWoo 2022-10-24 20:08:58 +08:00
parent 2f30c817bf
commit b5d1c4e657
7 changed files with 78 additions and 33 deletions

View File

@ -309,16 +309,21 @@ Sessions with Ephemerals (1):
/clickhouse/task_queue/ddl /clickhouse/task_queue/ddl
``` ```
- `csnp`: Schedule a snapshot creation task. Return `Snapshot creation scheduled.` if successfully scheduled or `Fail to scheduled snapshot creation.` if failed. - `csnp`: Schedule a snapshot creation task. Return `Snapshot creation scheduled with last committed log index xxx.` if successfully scheduled or `Fail to scheduled snapshot creation task.` if failed.
``` ```
Snapshot creation scheduled. Snapshot creation scheduled with last committed log index 100.
``` ```
- `snpd`: Whether the last successfully scheduled snapshot creation is done. Return `Yes` if true or `No` if false. - `lgif`: Keeper log information. `last_log_idx` : my last log index in log store; `last_log_term` : my last log term; `last_committed_log_idx` : my last committed log index in state machine; `leader_committed_log_idx` : leader's committed log index from my perspective; `target_committed_log_idx` : target log index should be committed to; `last_snapshot_idx` : the largest committed log index in last snapshot.
``` ```
Yes last_log_idx : 101
last_log_term : 1
last_committed_log_idx : 100
leader_committed_log_idx : 101
target_committed_log_idx : 101
last_snapshot_idx : 50
``` ```
## [experimental] Migration from ZooKeeper {#migration-from-zookeeper} ## [experimental] Migration from ZooKeeper {#migration-from-zookeeper}

View File

@ -139,6 +139,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr create_snapshot_command = std::make_shared<CreateSnapshotCommand>(keeper_dispatcher); FourLetterCommandPtr create_snapshot_command = std::make_shared<CreateSnapshotCommand>(keeper_dispatcher);
factory.registerCommand(create_snapshot_command); factory.registerCommand(create_snapshot_command);
FourLetterCommandPtr log_info_command = std::make_shared<LogInfoCommand>(keeper_dispatcher);
factory.registerCommand(log_info_command);
factory.initializeAllowList(keeper_dispatcher); factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true); factory.setInitialize(true);
} }
@ -477,12 +480,22 @@ String ApiVersionCommand::run()
String CreateSnapshotCommand::run() String CreateSnapshotCommand::run()
{ {
return keeper_dispatcher.createSnapshot() ? "Snapshot creation scheduled." : "Fail to scheduled snapshot creation task."; auto log_index = keeper_dispatcher.createSnapshot();
return log_index > 0 ? "Snapshot creation scheduled with last committed log index " + std::to_string(log_index) + "."
: "Fail to scheduled snapshot creation task.";
} }
String CheckSnapshotDoneCommand::run() String LogInfoCommand::run()
{ {
return keeper_dispatcher.snapshotDone() ? "Snapshot creation done." : "Fail to scheduled snapshot creation task."; KeeperLogInfo log_info = keeper_dispatcher.getKeeperLogInfo();
StringBuffer ret;
print(ret, "last_log_idx", log_info.last_log_idx);
print(ret, "last_log_term", log_info.last_log_term);
print(ret, "last_committed_log_idx", log_info.last_committed_log_idx);
print(ret, "leader_committed_log_idx", log_info.leader_committed_log_idx);
print(ret, "target_committed_log_idx", log_info.target_committed_log_idx);
print(ret, "last_snapshot_idx", log_info.last_snapshot_idx);
return ret.str();
} }
} }

View File

@ -341,17 +341,24 @@ struct CreateSnapshotCommand : public IFourLetterCommand
~CreateSnapshotCommand() override = default; ~CreateSnapshotCommand() override = default;
}; };
/// Check whether last manual snapshot done /** Raft log information:
struct CheckSnapshotDoneCommand : public IFourLetterCommand * last_log_idx : 101
* last_log_term : 1
* last_committed_idx : 100
* leader_committed_log_idx : 101
* target_committed_log_idx : 101
* last_snapshot_idx : 50
*/
struct LogInfoCommand : public IFourLetterCommand
{ {
explicit CheckSnapshotDoneCommand(KeeperDispatcher & keeper_dispatcher_) explicit LogInfoCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_) : IFourLetterCommand(keeper_dispatcher_)
{ {
} }
String name() override { return "snpd"; } String name() override { return "lgif"; }
String run() override; String run() override;
~CheckSnapshotDoneCommand() override = default; ~LogInfoCommand() override = default;
}; };
} }

View File

@ -47,4 +47,26 @@ struct Keeper4LWInfo
} }
}; };
/// Keeper log information for 4lw commands
struct KeeperLogInfo
{
/// My last log index in log store.
uint64_t last_log_idx;
/// My last log term.
uint64_t last_log_term;
/// My last committed log index in state machine.
uint64_t last_committed_log_idx;
/// Leader's committed log index from my perspective.
uint64_t leader_committed_log_idx;
/// Target log index should be committed to.
uint64_t target_committed_log_idx;
/// The largest committed log index in last snapshot.
uint64_t last_snapshot_idx;
};
} }

View File

@ -204,16 +204,16 @@ public:
keeper_stats.reset(); keeper_stats.reset();
} }
/// Create snapshot manually /// Create snapshot manually, return the last committed log index in the snapshot
bool createSnapshot() uint64_t createSnapshot()
{ {
return server->createSnapshot(); return server->createSnapshot();
} }
/// Whether the last manually created snapshot is done /// Whether the last manually created snapshot is done
bool snapshotDone() KeeperLogInfo getKeeperLogInfo()
{ {
return server->snapshotDone(); return server->getKeeperLogInfo();
} }
}; };

View File

@ -907,23 +907,25 @@ Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
return result; return result;
} }
bool KeeperServer::createSnapshot() uint64_t KeeperServer::createSnapshot()
{ {
std::lock_guard lock(snapshot_mutex);
uint64_t log_idx = raft_instance->create_snapshot(); uint64_t log_idx = raft_instance->create_snapshot();
if (log_idx != 0) if (log_idx != 0)
{ LOG_INFO(log, "Snapshot creation scheduled with last committed log index {}.", log_idx);
last_manual_snapshot_log_idx = log_idx; else
LOG_INFO(log, "Successfully schedule a keeper snapshot creation task at log index {}", log_idx); LOG_WARNING(log, "Fail to scheduled snapshot creation task.");
return true; return log_idx;
}
return false;
} }
bool KeeperServer::snapshotDone() KeeperLogInfo KeeperServer::getKeeperLogInfo()
{ {
std::lock_guard lock(snapshot_mutex); KeeperLogInfo log_info;
return last_manual_snapshot_log_idx != 0 && last_manual_snapshot_log_idx == raft_instance->get_last_snapshot_idx(); log_info.last_log_idx = raft_instance->get_last_log_idx();
log_info.last_log_term = raft_instance->get_last_log_term();
log_info.leader_committed_log_idx = raft_instance->get_leader_committed_log_idx();
log_info.target_committed_log_idx = raft_instance->get_target_committed_log_idx();
log_info.last_snapshot_idx = raft_instance->get_last_snapshot_idx();
return log_info;
} }
} }

View File

@ -66,10 +66,6 @@ private:
const bool create_snapshot_on_exit; const bool create_snapshot_on_exit;
/// Used to check whether the previous manually created snapshot complete.
uint64_t last_manual_snapshot_log_idx;
std::mutex snapshot_mutex;
public: public:
KeeperServer( KeeperServer(
const KeeperConfigurationAndSettingsPtr & settings_, const KeeperConfigurationAndSettingsPtr & settings_,
@ -136,9 +132,9 @@ public:
/// Return true if update was successfully received. /// Return true if update was successfully received.
bool waitConfigurationUpdate(const ConfigUpdateAction & task); bool waitConfigurationUpdate(const ConfigUpdateAction & task);
bool createSnapshot(); uint64_t createSnapshot();
bool snapshotDone(); KeeperLogInfo getKeeperLogInfo();
}; };
} }