2020-04-05 12:18:51 +00:00
|
|
|
#include <Databases/DatabaseReplicated.h>
|
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <IO/ReadBufferFromString.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteBufferFromFile.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Interpreters/Context.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <Interpreters/executeQuery.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2020-05-12 13:35:05 +00:00
|
|
|
#include <Common/Exception.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
|
|
|
#include <Common/ZooKeeper/Types.h>
|
|
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
2020-05-11 12:55:17 +00:00
|
|
|
#include <Common/ZooKeeper/Lock.h>
|
2020-04-05 12:18:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NO_ZOOKEEPER;
|
2020-06-20 15:39:58 +00:00
|
|
|
extern const int FILE_DOESNT_EXIST;
|
2020-04-05 12:18:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(current_zookeeper_mutex);
|
|
|
|
current_zookeeper = zookeeper;
|
|
|
|
}
|
|
|
|
|
|
|
|
zkutil::ZooKeeperPtr DatabaseReplicated::tryGetZooKeeper() const
|
|
|
|
{
|
|
|
|
std::lock_guard lock(current_zookeeper_mutex);
|
|
|
|
return current_zookeeper;
|
|
|
|
}
|
|
|
|
|
|
|
|
zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const
|
|
|
|
{
|
|
|
|
auto res = tryGetZooKeeper();
|
|
|
|
if (!res)
|
|
|
|
throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
DatabaseReplicated::DatabaseReplicated(
|
|
|
|
const String & name_,
|
|
|
|
const String & metadata_path_,
|
|
|
|
const String & zookeeper_path_,
|
|
|
|
const String & replica_name_,
|
2020-05-05 14:16:59 +00:00
|
|
|
Context & context_)
|
2020-06-20 15:39:58 +00:00
|
|
|
: DatabaseAtomic(name_, metadata_path_, "store/", "DatabaseReplicated (" + name_ + ")", context_)
|
2020-04-05 12:18:51 +00:00
|
|
|
, zookeeper_path(zookeeper_path_)
|
|
|
|
, replica_name(replica_name_)
|
|
|
|
{
|
|
|
|
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
|
|
|
|
zookeeper_path.resize(zookeeper_path.size() - 1);
|
2020-05-01 13:16:02 +00:00
|
|
|
// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
|
2020-04-05 12:18:51 +00:00
|
|
|
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
|
|
|
|
zookeeper_path = "/" + zookeeper_path;
|
2020-05-01 13:16:02 +00:00
|
|
|
|
2020-04-05 12:18:51 +00:00
|
|
|
if (context_.hasZooKeeper()) {
|
|
|
|
current_zookeeper = context_.getZooKeeper();
|
|
|
|
}
|
|
|
|
if (!current_zookeeper)
|
|
|
|
{
|
2020-04-30 16:15:27 +00:00
|
|
|
throw Exception("Can't create replicated database without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
}
|
2020-04-05 12:18:51 +00:00
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
// New database
|
2020-05-13 17:00:47 +00:00
|
|
|
if (!current_zookeeper->exists(zookeeper_path, {}, NULL)) {
|
2020-05-24 17:13:53 +00:00
|
|
|
createDatabaseZKNodes();
|
2020-06-20 15:39:58 +00:00
|
|
|
// Old replica recovery
|
|
|
|
} else if (current_zookeeper->exists(zookeeper_path + "/replicas/" + replica_name, {}, NULL)) {
|
|
|
|
String local_last_entry;
|
|
|
|
try
|
|
|
|
{
|
|
|
|
ReadBufferFromFile in(getMetadataPath() + ".last_entry", 16);
|
|
|
|
readStringUntilEOF(local_last_entry, in);
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST) {
|
|
|
|
// that is risky cause
|
|
|
|
// if replica name is the same
|
|
|
|
// than the last one wins
|
|
|
|
saveState();
|
|
|
|
} else {
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2020-05-24 17:13:53 +00:00
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
String remote_last_entry = current_zookeeper->get(zookeeper_path + "/replicas/" + replica_name, {}, NULL);
|
|
|
|
if (local_last_entry == remote_last_entry) {
|
|
|
|
last_executed_log_entry = local_last_entry;
|
|
|
|
} else {
|
|
|
|
LOG_DEBUG(log, "LOCAL: " << local_last_entry);
|
|
|
|
LOG_DEBUG(log, "ZK: " << remote_last_entry);
|
|
|
|
throw Exception("Can't create replicated database MISCONFIGURATION or something", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
}
|
2020-05-13 17:00:47 +00:00
|
|
|
}
|
2020-05-11 12:55:17 +00:00
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
snapshot_period = context_.getConfigRef().getInt("database_replicated_snapshot_period", 10);
|
|
|
|
LOG_DEBUG(log, "Snapshot period is set to " << snapshot_period);
|
2020-05-24 17:13:53 +00:00
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
background_log_executor = global_context.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::background_executor)", [this]{ runBackgroundLogExecutor();} );
|
|
|
|
|
|
|
|
background_log_executor->scheduleAfter(500);
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
void DatabaseReplicated::createDatabaseZKNodes() {
|
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
|
|
|
|
current_zookeeper->createAncestors(zookeeper_path);
|
|
|
|
|
|
|
|
current_zookeeper->createIfNotExists(zookeeper_path, String());
|
|
|
|
current_zookeeper->createIfNotExists(zookeeper_path + "/log", String());
|
2020-06-20 15:39:58 +00:00
|
|
|
current_zookeeper->createIfNotExists(zookeeper_path + "/snapshots", String());
|
|
|
|
current_zookeeper->createIfNotExists(zookeeper_path + "/replicas", String());
|
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::RemoveOutdatedSnapshotsAndLog() {
|
|
|
|
// This method removes all snapshots and logged queries
|
|
|
|
// that no longer will be in use by current replicas or
|
|
|
|
// new coming ones.
|
|
|
|
// Each registered replica has its state in ZooKeeper.
|
|
|
|
// Therefore removed snapshots and logged queries are less
|
|
|
|
// than a least advanced replica.
|
|
|
|
// It does not interfere with a new coming replica
|
|
|
|
// metadata loading from snapshot
|
|
|
|
// because the replica will use the last snapshot available
|
|
|
|
// and this snapshot will set the last executed log query
|
|
|
|
// to a greater one than the least advanced current replica.
|
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
Strings replica_states = current_zookeeper->getChildren(zookeeper_path + "/replicas");
|
|
|
|
auto least_advanced = std::min_element(replica_states.begin(), replica_states.end());
|
|
|
|
Strings snapshots = current_zookeeper->getChildren(zookeeper_path + "/snapshots");
|
|
|
|
|
|
|
|
if (snapshots.size() < 2) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::sort(snapshots.begin(), snapshots.end());
|
|
|
|
auto still_useful = std::lower_bound(snapshots.begin(), snapshots.end(), *least_advanced);
|
|
|
|
snapshots.erase(still_useful, snapshots.end());
|
|
|
|
for (const String & snapshot : snapshots) {
|
|
|
|
current_zookeeper->tryRemoveRecursive(zookeeper_path + "/snapshots/" + snapshot);
|
|
|
|
}
|
|
|
|
|
|
|
|
Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log");
|
|
|
|
std::sort(log_entry_names.begin(), log_entry_names.end());
|
|
|
|
auto still_useful_log = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), *still_useful);
|
|
|
|
log_entry_names.erase(still_useful_log, log_entry_names.end());
|
|
|
|
for (const String & log_entry_name : log_entry_names) {
|
|
|
|
String log_entry_path = zookeeper_path + "/log/" + log_entry_name;
|
|
|
|
current_zookeeper->tryRemove(log_entry_path);
|
|
|
|
}
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
void DatabaseReplicated::runBackgroundLogExecutor() {
|
2020-06-20 15:39:58 +00:00
|
|
|
if (last_executed_log_entry == "") {
|
|
|
|
loadMetadataFromSnapshot();
|
|
|
|
}
|
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-06-07 11:20:05 +00:00
|
|
|
Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log");
|
|
|
|
|
|
|
|
std::sort(log_entry_names.begin(), log_entry_names.end());
|
|
|
|
auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry);
|
|
|
|
|
|
|
|
log_entry_names.erase(log_entry_names.begin(), newest_entry_it);
|
|
|
|
|
|
|
|
for (const String & log_entry_name : log_entry_names) {
|
|
|
|
String log_entry_path = zookeeper_path + "/log/" + log_entry_name;
|
|
|
|
executeFromZK(log_entry_path);
|
|
|
|
last_executed_log_entry = log_entry_name;
|
2020-06-20 15:39:58 +00:00
|
|
|
saveState();
|
|
|
|
|
|
|
|
int log_n = parse<int>(log_entry_name.substr(4));
|
|
|
|
int last_log_n = parse<int>(log_entry_names.back().substr(4));
|
|
|
|
|
|
|
|
// The third condition gurantees at most one snapshot per batch
|
|
|
|
if (log_n > 0 && snapshot_period > 0 && (last_log_n - log_n) / snapshot_period == 0 && log_n % snapshot_period == 0) {
|
|
|
|
createSnapshot();
|
|
|
|
}
|
2020-05-24 17:13:53 +00:00
|
|
|
}
|
2020-06-07 11:20:05 +00:00
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
background_log_executor->scheduleAfter(500);
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
|
|
|
|
2020-05-13 17:00:47 +00:00
|
|
|
void DatabaseReplicated::saveState() {
|
2020-05-27 18:40:00 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-06-20 15:39:58 +00:00
|
|
|
current_zookeeper->createOrUpdate(zookeeper_path + "/replicas/" + replica_name, last_executed_log_entry, zkutil::CreateMode::Persistent);
|
2020-05-27 18:40:00 +00:00
|
|
|
|
|
|
|
String metadata_file = getMetadataPath() + ".last_entry";
|
2020-06-20 15:39:58 +00:00
|
|
|
WriteBufferFromFile out(metadata_file, last_executed_log_entry.size(), O_WRONLY | O_CREAT);
|
|
|
|
writeString(last_executed_log_entry, out);
|
2020-05-13 17:00:47 +00:00
|
|
|
out.next();
|
|
|
|
if (global_context.getSettingsRef().fsync_metadata)
|
|
|
|
out.sync();
|
|
|
|
out.close();
|
|
|
|
}
|
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
void DatabaseReplicated::executeFromZK(String & path) {
|
2020-05-11 12:55:17 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-05-24 17:13:53 +00:00
|
|
|
String query_to_execute = current_zookeeper->get(path, {}, NULL);
|
2020-05-11 12:55:17 +00:00
|
|
|
ReadBufferFromString istr(query_to_execute);
|
|
|
|
String dummy_string;
|
|
|
|
WriteBufferFromString ostr(dummy_string);
|
2020-05-12 13:35:05 +00:00
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
current_context = std::make_unique<Context>(global_context);
|
2020-05-26 15:08:09 +00:00
|
|
|
current_context->getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY;
|
2020-05-12 14:25:36 +00:00
|
|
|
current_context->setCurrentDatabase(database_name);
|
2020-05-12 13:35:05 +00:00
|
|
|
current_context->setCurrentQueryId(""); // generate random query_id
|
|
|
|
executeQuery(istr, ostr, false, *current_context, {});
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2020-05-24 17:13:53 +00:00
|
|
|
tryLogCurrentException(log, "Query from zookeeper " + query_to_execute + " wasn't finished successfully");
|
2020-05-12 13:35:05 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Executed query: " << query_to_execute);
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
|
|
|
|
2020-05-05 14:16:59 +00:00
|
|
|
void DatabaseReplicated::propose(const ASTPtr & query) {
|
2020-05-11 12:55:17 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-05-13 17:00:47 +00:00
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
LOG_DEBUG(log, "Writing the query to log: " << queryToString(query));
|
2020-06-07 11:20:05 +00:00
|
|
|
current_zookeeper->create(zookeeper_path + "/log/log-", queryToString(query), zkutil::CreateMode::PersistentSequential);
|
2020-05-27 18:33:37 +00:00
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
background_log_executor->schedule();
|
2020-05-13 17:00:47 +00:00
|
|
|
}
|
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
void DatabaseReplicated::createSnapshot() {
|
2020-05-24 17:13:53 +00:00
|
|
|
current_zookeeper = getZooKeeper();
|
2020-06-20 15:39:58 +00:00
|
|
|
String snapshot_path = zookeeper_path + "/snapshots/" + last_executed_log_entry;
|
|
|
|
|
|
|
|
if (Coordination::ZNODEEXISTS == current_zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent)) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-05-13 17:00:47 +00:00
|
|
|
for (auto iterator = getTablesIterator({}); iterator->isValid(); iterator->next()) {
|
|
|
|
String table_name = iterator->name();
|
|
|
|
auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true);
|
|
|
|
String statement = queryToString(query);
|
2020-06-20 15:39:58 +00:00
|
|
|
current_zookeeper->createOrUpdate(snapshot_path + "/" + table_name, statement, zkutil::CreateMode::Persistent);
|
2020-05-13 17:00:47 +00:00
|
|
|
}
|
2020-06-20 15:39:58 +00:00
|
|
|
|
|
|
|
RemoveOutdatedSnapshotsAndLog();
|
2020-05-11 12:55:17 +00:00
|
|
|
}
|
2020-04-05 12:18:51 +00:00
|
|
|
|
2020-05-24 17:13:53 +00:00
|
|
|
void DatabaseReplicated::loadMetadataFromSnapshot() {
|
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
|
2020-06-20 15:39:58 +00:00
|
|
|
Strings snapshots;
|
|
|
|
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::ZOK)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (snapshots.size() < 1) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
|
2020-05-24 17:13:53 +00:00
|
|
|
Strings metadatas;
|
2020-06-20 15:39:58 +00:00
|
|
|
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::ZOK)
|
2020-05-24 17:13:53 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
for (auto t = metadatas.begin(); t != metadatas.end(); ++t) {
|
2020-06-20 15:39:58 +00:00
|
|
|
String path = zookeeper_path + "/snapshots/" + *latest_snapshot + "/" + *t;
|
2020-05-24 17:13:53 +00:00
|
|
|
executeFromZK(path);
|
|
|
|
}
|
2020-06-20 15:39:58 +00:00
|
|
|
|
|
|
|
last_executed_log_entry = *latest_snapshot;
|
|
|
|
saveState();
|
|
|
|
}
|
|
|
|
|
|
|
|
void DatabaseReplicated::drop(const Context & context_)
|
|
|
|
{
|
|
|
|
current_zookeeper = getZooKeeper();
|
|
|
|
current_zookeeper->tryRemove(zookeeper_path + "/replicas/" + replica_name);
|
|
|
|
DatabaseAtomic::drop(context_);
|
2020-05-24 17:13:53 +00:00
|
|
|
}
|
|
|
|
|
2020-04-05 12:18:51 +00:00
|
|
|
}
|