This commit is contained in:
Alexander Tokmakov 2021-02-11 18:14:38 +03:00
parent 15256d86e5
commit 9afb16759e
2 changed files with 40 additions and 8 deletions

View File

@ -351,7 +351,6 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
else if (!tables_to_detach.empty()) else if (!tables_to_detach.empty())
LOG_WARNING(log, "Will DETACH PERMANENTLY {} broken tables to recover replica", tables_to_detach.size()); LOG_WARNING(log, "Will DETACH PERMANENTLY {} broken tables to recover replica", tables_to_detach.size());
auto db_guard = DatabaseCatalog::instance().getDDLGuard(getDatabaseName(), "");
for (const auto & table_name : tables_to_detach) for (const auto & table_name : tables_to_detach)
{ {
String to_name = fmt::format("{}_{}_{}_{}", BROKEN_TABLE_PREFIX, table_name, max_log_ptr, thread_local_rng() % 1000); String to_name = fmt::format("{}_{}_{}_{}", BROKEN_TABLE_PREFIX, table_name, max_log_ptr, thread_local_rng() % 1000);
@ -517,12 +516,12 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name); throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name);
String statement = readMetadataFile(table_name); String statement = readMetadataFile(table_name);
String statement_to = readMetadataFile(to_table_name);
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_name); String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_name);
String metadata_zk_path_to = txn->zookeeper_path + "/metadata/" + escapeForFileName(to_table_name); String metadata_zk_path_to = txn->zookeeper_path + "/metadata/" + escapeForFileName(to_table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1)); txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1));
if (exchange) if (exchange)
{ {
String statement_to = readMetadataFile(to_table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path_to, -1)); txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path_to, -1));
txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent)); txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent));
} }

View File

@ -788,15 +788,33 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper)
/// Now we can safely delete entry /// Now we can safely delete entry
LOG_INFO(log, "Task {} is outdated, deleting it", node_name); LOG_INFO(log, "Task {} is outdated, deleting it", node_name);
/// We recursively delete all nodes except entry/finished to prevent staled hosts from /// We recursively delete all nodes except node_path/finished to prevent staled hosts from
/// creating entry/active node (see createStatusDirs(...)) /// creating node_path/active node (see createStatusDirs(...))
zookeeper->tryRemoveChildrenRecursive(node_path, "finished"); zookeeper->tryRemoveChildrenRecursive(node_path, "finished");
/// And then we remove entry and entry/finished in a single transaction /// And then we remove node_path and node_path/finished in a single transaction
Coordination::Requests ops; Coordination::Requests ops;
Coordination::Responses res;
ops.emplace_back(zkutil::makeCheckRequest(node_path, -1)); /// See a comment below
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(node_path) / "finished", -1)); ops.emplace_back(zkutil::makeRemoveRequest(fs::path(node_path) / "finished", -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1)); ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
zookeeper->multi(ops); auto rm_entry_res = zookeeper->tryMulti(ops, res);
if (rm_entry_res == Coordination::Error::ZNONODE)
{
/// Most likely both node_path/finished and node_path were removed concurrently.
bool entry_removed_concurrently = res[0]->error == Coordination::Error::ZNONODE;
if (entry_removed_concurrently)
continue;
/// Possible rare case: initiator node has lost connection after enqueueing entry and failed to create status dirs.
/// No one has started to process the entry, so node_path/active and node_path/finished nodes were never created, node_path has no children.
/// Entry became outdated, but we cannot remove remove it in a transaction with node_path/finished.
assert(res[0]->error == Coordination::Error::ZOK && res[1]->error == Coordination::Error::ZNONODE);
rm_entry_res = zookeeper->tryRemove(node_path);
assert(rm_entry_res != Coordination::Error::ZNOTEMPTY);
continue;
}
zkutil::KeeperMultiException::check(rm_entry_res, ops, res);
} }
catch (...) catch (...)
{ {
@ -828,12 +846,27 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP
Coordination::Responses responses; Coordination::Responses responses;
Coordination::Error code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
bool both_created = code == Coordination::Error::ZOK; bool both_created = code == Coordination::Error::ZOK;
/// Failed on attempt to create node_path/active because it exists, so node_path/finished must exist too
bool both_already_exists = responses.size() == 2 && responses[0]->error == Coordination::Error::ZNODEEXISTS bool both_already_exists = responses.size() == 2 && responses[0]->error == Coordination::Error::ZNODEEXISTS
&& responses[1]->error == Coordination::Error::ZRUNTIMEINCONSISTENCY;
assert(!both_already_exists || (zookeeper->exists(fs::path(node_path) / "active") && zookeeper->exists(fs::path(node_path) / "finished")));
/// Failed on attempt to create node_path/finished, but node_path/active does not exist
bool is_currently_deleting = responses.size() == 2 && responses[0]->error == Coordination::Error::ZOK
&& responses[1]->error == Coordination::Error::ZNODEEXISTS; && responses[1]->error == Coordination::Error::ZNODEEXISTS;
if (both_created || both_already_exists) if (both_created || both_already_exists)
return; return;
throw Coordination::Exception(code);
if (is_currently_deleting)
throw Exception(ErrorCodes::UNFINISHED, "Cannot create status dirs for {}, "
"most likely because someone is deleting it concurrently", node_path);
/// Connection lost or entry was removed
assert(Coordination::isHardwareError(code) || code == Coordination::Error::ZNONODE);
zkutil::KeeperMultiException::check(code, ops, responses);
} }