address pr comments

This commit is contained in:
Val 2020-06-24 15:45:42 +03:00
parent 16e50e33d7
commit d293e002a7
2 changed files with 19 additions and 7 deletions

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
{
extern const int NO_ZOOKEEPER;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
void DatabaseReplicated::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
@ -55,10 +56,14 @@ DatabaseReplicated::DatabaseReplicated(
, zookeeper_path(zookeeper_path_)
, replica_name(replica_name_)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
if (zookeeper_path.empty() || replica_name.empty()) {
throw Exception("ZooKeeper path and replica name must be non-empty", ErrorCodes::BAD_ARGUMENTS);
}
if (zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
if (zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
if (context_.hasZooKeeper()) {
@ -70,10 +75,10 @@ DatabaseReplicated::DatabaseReplicated(
}
// New database
if (!current_zookeeper->exists(zookeeper_path, {}, NULL)) {
if (!current_zookeeper->exists(zookeeper_path)) {
createDatabaseZKNodes();
// Old replica recovery
} else if (current_zookeeper->exists(zookeeper_path + "/replicas/" + replica_name, {}, NULL)) {
} else if (current_zookeeper->exists(zookeeper_path + "/replicas/" + replica_name)) {
String remote_last_entry = current_zookeeper->get(zookeeper_path + "/replicas/" + replica_name, {}, NULL);
String local_last_entry;
@ -243,8 +248,9 @@ void DatabaseReplicated::createSnapshot() {
String table_name = iterator->name();
auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true);
String statement = queryToString(query);
current_zookeeper->createOrUpdate(snapshot_path + "/" + table_name, statement, zkutil::CreateMode::Persistent);
current_zookeeper->createIfNotExists(snapshot_path + "/" + table_name, statement);
}
current_zookeeper->createIfNotExists(snapshot_path + "/.completed", String());
RemoveOutdatedSnapshotsAndLog();
}
@ -258,11 +264,17 @@ void DatabaseReplicated::loadMetadataFromSnapshot() {
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::ZOK)
return;
auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
while (snapshots.size() > 0 && !current_zookeeper->exists(zookeeper_path + "/snapshots/" + *latest_snapshot + "/.completed")) {
snapshots.erase(latest_snapshot);
latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
}
if (snapshots.size() < 1) {
return;
}
auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
Strings metadatas;
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::ZOK)
return;

View File

@ -640,7 +640,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
else if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind == ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
if (create.uuid == UUIDHelpers::Nil)
throw Exception("Table UUID is not specified in DDL log", ErrorCodes::INCORRECT_QUERY);
throw Exception("Table UUID is not specified in DDL log", ErrorCodes::LOGICAL_ERROR);
}
else
{