db replicated refactoring

This commit is contained in:
Val 2020-05-24 20:13:53 +03:00
parent 1f03839830
commit 4921dc6dab
2 changed files with 69 additions and 50 deletions

View File

@ -80,7 +80,6 @@ DatabaseReplicated::DatabaseReplicated(
Context & context_)
// : DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseReplicated (" + name_ + ")", context_)
// TODO add constructor to Atomic and call it here with path and logger name specification
// TODO ask why const and & are ommited in Atomic
: DatabaseAtomic(name_, metadata_path_, context_)
, zookeeper_path(zookeeper_path_)
, replica_name(replica_name_)
@ -102,42 +101,50 @@ DatabaseReplicated::DatabaseReplicated(
}
if (!current_zookeeper->exists(zookeeper_path, {}, NULL)) {
current_zookeeper->createAncestors(zookeeper_path);
current_zookeeper->createOrUpdate(zookeeper_path, String(), zkutil::CreateMode::Persistent);
current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", "0", zkutil::CreateMode::Persistent);
createDatabaseZKNodes();
}
// replica
if (!current_zookeeper->exists(replica_path, {}, NULL)) {
current_zookeeper->createAncestors(replica_path);
} else {
current_zookeeper->createOrUpdate(replica_path, String(), zkutil::CreateMode::Persistent);
}
current_zookeeper->createOrUpdate(replica_path, String(), zkutil::CreateMode::Persistent);
backgroundLogExecutor = global_context.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::the_threeeed)", [this]{ runMainThread();} );
backgroundLogExecutor->schedule();
//loadMetadataFromSnapshot();
background_log_executor = global_context.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::the_threeeed)", [this]{ runBackgroundLogExecutor();} );
background_log_executor->schedule();
}
DatabaseReplicated::~DatabaseReplicated()
{
stop_flag = true;
void DatabaseReplicated::createDatabaseZKNodes() {
current_zookeeper = getZooKeeper();
if (current_zookeeper->exists(zookeeper_path))
return;
current_zookeeper->createAncestors(zookeeper_path);
current_zookeeper->createIfNotExists(zookeeper_path, String());
current_zookeeper->createIfNotExists(zookeeper_path + "/last_entry", "0");
current_zookeeper->createIfNotExists(zookeeper_path + "/log", String());
current_zookeeper->createIfNotExists(zookeeper_path + "/snapshot", String());
}
void DatabaseReplicated::runMainThread() {
LOG_DEBUG(log, "Started " << database_name << " database worker thread\n Replica: " << replica_name);
if (!stop_flag) { // TODO is there a need for the flag?
current_zookeeper = getZooKeeper();
String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL);
size_t last_n_parsed = parse<size_t>(last_n);
LOG_DEBUG(log, "PARSED " << last_n_parsed);
LOG_DEBUG(log, "LOCAL CURRENT " << current_log_entry_n);
void DatabaseReplicated::runBackgroundLogExecutor() {
current_zookeeper = getZooKeeper();
String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL);
size_t last_n_parsed = parse<size_t>(last_n);
bool newEntries = current_log_entry_n < last_n_parsed;
while (current_log_entry_n < last_n_parsed) {
current_log_entry_n++;
executeLog(current_log_entry_n);
}
if (newEntries) {
saveState();
}
backgroundLogExecutor->scheduleAfter(500);
bool newEntries = current_log_entry_n < last_n_parsed;
while (current_log_entry_n < last_n_parsed) {
current_log_entry_n++;
String log_path = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n);
executeFromZK(log_path);
}
if (newEntries) {
saveState();
}
background_log_executor->scheduleAfter(500);
}
void DatabaseReplicated::saveState() {
@ -153,10 +160,9 @@ void DatabaseReplicated::saveState() {
out.close();
}
void DatabaseReplicated::executeLog(size_t n) {
void DatabaseReplicated::executeFromZK(String & path) {
current_zookeeper = getZooKeeper();
String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL);
String query_to_execute = current_zookeeper->get(path, {}, NULL);
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
@ -171,7 +177,7 @@ void DatabaseReplicated::executeLog(size_t n) {
}
catch (...)
{
tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully");
tryLogCurrentException(log, "Query from zookeeper " + query_to_execute + " wasn't finished successfully");
}
@ -195,21 +201,23 @@ void DatabaseReplicated::propose(const ASTPtr & query) {
current_zookeeper = getZooKeeper();
auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "propose_lock", replica_name);
// schedule and deactive combo
// ensures that replica is up to date
// and since propose lock is acquired,
// no other propose can happen from
// different replicas during this call
backgroundLogExecutor->schedule();
backgroundLogExecutor->deactivate();
background_log_executor->schedule();
background_log_executor->deactivate();
if (current_log_entry_n > 5) { // make a settings variable
createSnapshot();
}
// if (current_log_entry_n > 5) { // make a settings variable
// // TODO check that all the replicas are up to date!
// updateSnapshot();
// current_log_entry_n = 0;
// current_zookeeper->removeChildren(zookeeper_path + "/log");
// }
current_log_entry_n++; // starting from 1
String log_entry = zookeeper_path + "/log." + std::to_string(current_log_entry_n);
String log_entry = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n);
current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent);
current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent);
@ -218,9 +226,9 @@ void DatabaseReplicated::propose(const ASTPtr & query) {
saveState();
}
void DatabaseReplicated::createSnapshot() {
current_zookeeper->createAncestors(zookeeper_path + "/snapshot");
current_zookeeper->createOrUpdate(zookeeper_path + "/snapshot", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent);
void DatabaseReplicated::updateSnapshot() {
current_zookeeper = getZooKeeper();
current_zookeeper->tryRemoveChildren(zookeeper_path + "/snapshot");
for (auto iterator = getTablesIterator({}); iterator->isValid(); iterator->next()) {
String table_name = iterator->name();
auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true);
@ -229,4 +237,17 @@ void DatabaseReplicated::createSnapshot() {
}
}
void DatabaseReplicated::loadMetadataFromSnapshot() {
current_zookeeper = getZooKeeper();
Strings metadatas;
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshot", metadatas) != Coordination::ZOK)
return;
for (auto t = metadatas.begin(); t != metadatas.end(); ++t) {
String path = zookeeper_path + "/snapshot/" + *t;
executeFromZK(path);
}
}
}

View File

@ -20,8 +20,6 @@ class DatabaseReplicated : public DatabaseAtomic
public:
DatabaseReplicated(const String & name_, const String & metadata_path_, const String & zookeeper_path_, const String & replica_name_, Context & context);
~DatabaseReplicated();
String getEngineName() const override { return "Replicated"; }
void propose(const ASTPtr & query) override;
@ -30,21 +28,21 @@ public:
String replica_name;
private:
void createDatabaseZKNodes();
void runMainThread();
void runBackgroundLogExecutor();
void executeLog(size_t n);
void executeFromZK(String & path);
void saveState();
void createSnapshot();
void updateSnapshot();
void loadMetadataFromSnapshot();
std::unique_ptr<Context> current_context; // to run executeQuery
std::atomic<size_t> current_log_entry_n = 0;
std::atomic<bool> stop_flag{false};
BackgroundSchedulePool::TaskHolder backgroundLogExecutor;
BackgroundSchedulePool::TaskHolder background_log_executor;
String replica_path;