diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 9dd8530fc46..ae5a8249202 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -80,7 +80,6 @@ DatabaseReplicated::DatabaseReplicated( Context & context_) // : DatabaseOrdinary(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseReplicated (" + name_ + ")", context_) // TODO add constructor to Atomic and call it here with path and logger name specification - // TODO ask why const and & are ommited in Atomic : DatabaseAtomic(name_, metadata_path_, context_) , zookeeper_path(zookeeper_path_) , replica_name(replica_name_) @@ -102,42 +101,50 @@ DatabaseReplicated::DatabaseReplicated( } if (!current_zookeeper->exists(zookeeper_path, {}, NULL)) { - current_zookeeper->createAncestors(zookeeper_path); - current_zookeeper->createOrUpdate(zookeeper_path, String(), zkutil::CreateMode::Persistent); - current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", "0", zkutil::CreateMode::Persistent); + createDatabaseZKNodes(); + } + + // replica + if (!current_zookeeper->exists(replica_path, {}, NULL)) { current_zookeeper->createAncestors(replica_path); - } else { + current_zookeeper->createOrUpdate(replica_path, String(), zkutil::CreateMode::Persistent); } - current_zookeeper->createOrUpdate(replica_path, String(), zkutil::CreateMode::Persistent); - backgroundLogExecutor = global_context.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::the_threeeed)", [this]{ runMainThread();} ); - backgroundLogExecutor->schedule(); + //loadMetadataFromSnapshot(); + + background_log_executor = global_context.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::the_threeeed)", [this]{ runBackgroundLogExecutor();} ); + background_log_executor->schedule(); } -DatabaseReplicated::~DatabaseReplicated() -{ - stop_flag = true; +void DatabaseReplicated::createDatabaseZKNodes() { + current_zookeeper = getZooKeeper(); + + if (current_zookeeper->exists(zookeeper_path)) + return; + + current_zookeeper->createAncestors(zookeeper_path); + + current_zookeeper->createIfNotExists(zookeeper_path, String()); + current_zookeeper->createIfNotExists(zookeeper_path + "/last_entry", "0"); + current_zookeeper->createIfNotExists(zookeeper_path + "/log", String()); + current_zookeeper->createIfNotExists(zookeeper_path + "/snapshot", String()); } -void DatabaseReplicated::runMainThread() { - LOG_DEBUG(log, "Started " << database_name << " database worker thread\n Replica: " << replica_name); - if (!stop_flag) { // TODO is there a need for the flag? - current_zookeeper = getZooKeeper(); - String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL); - size_t last_n_parsed = parse(last_n); - LOG_DEBUG(log, "PARSED " << last_n_parsed); - LOG_DEBUG(log, "LOCAL CURRENT " << current_log_entry_n); +void DatabaseReplicated::runBackgroundLogExecutor() { + current_zookeeper = getZooKeeper(); + String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL); + size_t last_n_parsed = parse(last_n); - bool newEntries = current_log_entry_n < last_n_parsed; - while (current_log_entry_n < last_n_parsed) { - current_log_entry_n++; - executeLog(current_log_entry_n); - } - if (newEntries) { - saveState(); - } - backgroundLogExecutor->scheduleAfter(500); + bool newEntries = current_log_entry_n < last_n_parsed; + while (current_log_entry_n < last_n_parsed) { + current_log_entry_n++; + String log_path = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n); + executeFromZK(log_path); } + if (newEntries) { + saveState(); + } + background_log_executor->scheduleAfter(500); } void DatabaseReplicated::saveState() { @@ -153,10 +160,9 @@ void DatabaseReplicated::saveState() { out.close(); } -void DatabaseReplicated::executeLog(size_t n) { - +void DatabaseReplicated::executeFromZK(String & path) { current_zookeeper = getZooKeeper(); - String query_to_execute = current_zookeeper->get(zookeeper_path + "/log." + std::to_string(n), {}, NULL); + String query_to_execute = current_zookeeper->get(path, {}, NULL); ReadBufferFromString istr(query_to_execute); String dummy_string; WriteBufferFromString ostr(dummy_string); @@ -171,7 +177,7 @@ void DatabaseReplicated::executeLog(size_t n) { } catch (...) { - tryLogCurrentException(log, "Query " + query_to_execute + " wasn't finished successfully"); + tryLogCurrentException(log, "Query from zookeeper " + query_to_execute + " wasn't finished successfully"); } @@ -195,21 +201,23 @@ void DatabaseReplicated::propose(const ASTPtr & query) { current_zookeeper = getZooKeeper(); auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "propose_lock", replica_name); - // schedule and deactive combo // ensures that replica is up to date // and since propose lock is acquired, // no other propose can happen from // different replicas during this call - backgroundLogExecutor->schedule(); - backgroundLogExecutor->deactivate(); + background_log_executor->schedule(); + background_log_executor->deactivate(); - if (current_log_entry_n > 5) { // make a settings variable - createSnapshot(); - } +// if (current_log_entry_n > 5) { // make a settings variable +// // TODO check that all the replicas are up to date! +// updateSnapshot(); +// current_log_entry_n = 0; +// current_zookeeper->removeChildren(zookeeper_path + "/log"); +// } current_log_entry_n++; // starting from 1 - String log_entry = zookeeper_path + "/log." + std::to_string(current_log_entry_n); + String log_entry = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n); current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent); current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent); @@ -218,9 +226,9 @@ void DatabaseReplicated::propose(const ASTPtr & query) { saveState(); } -void DatabaseReplicated::createSnapshot() { - current_zookeeper->createAncestors(zookeeper_path + "/snapshot"); - current_zookeeper->createOrUpdate(zookeeper_path + "/snapshot", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent); +void DatabaseReplicated::updateSnapshot() { + current_zookeeper = getZooKeeper(); + current_zookeeper->tryRemoveChildren(zookeeper_path + "/snapshot"); for (auto iterator = getTablesIterator({}); iterator->isValid(); iterator->next()) { String table_name = iterator->name(); auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true); @@ -229,4 +237,17 @@ void DatabaseReplicated::createSnapshot() { } } +void DatabaseReplicated::loadMetadataFromSnapshot() { + current_zookeeper = getZooKeeper(); + + Strings metadatas; + if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshot", metadatas) != Coordination::ZOK) + return; + + for (auto t = metadatas.begin(); t != metadatas.end(); ++t) { + String path = zookeeper_path + "/snapshot/" + *t; + executeFromZK(path); + } +} + } diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index 0b2d097caac..bd2f11390d2 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -20,8 +20,6 @@ class DatabaseReplicated : public DatabaseAtomic public: DatabaseReplicated(const String & name_, const String & metadata_path_, const String & zookeeper_path_, const String & replica_name_, Context & context); - ~DatabaseReplicated(); - String getEngineName() const override { return "Replicated"; } void propose(const ASTPtr & query) override; @@ -30,21 +28,21 @@ public: String replica_name; private: + void createDatabaseZKNodes(); - void runMainThread(); + void runBackgroundLogExecutor(); - void executeLog(size_t n); + void executeFromZK(String & path); void saveState(); - - void createSnapshot(); + void updateSnapshot(); + void loadMetadataFromSnapshot(); std::unique_ptr current_context; // to run executeQuery std::atomic current_log_entry_n = 0; - std::atomic stop_flag{false}; - BackgroundSchedulePool::TaskHolder backgroundLogExecutor; + BackgroundSchedulePool::TaskHolder background_log_executor; String replica_path;