mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-09 17:14:47 +00:00
Merge pull request #69102 from ClickHouse/make-replicated-database-creation-operations-idempotent
Make `Replicated` database creation operations idempotent
This commit is contained in:
commit
2364949c95
@ -441,7 +441,8 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
|
|||||||
bool is_create_query = mode == LoadingStrictnessLevel::CREATE;
|
bool is_create_query = mode == LoadingStrictnessLevel::CREATE;
|
||||||
|
|
||||||
String replica_host_id;
|
String replica_host_id;
|
||||||
if (current_zookeeper->tryGet(replica_path, replica_host_id))
|
bool replica_exists_in_zk = current_zookeeper->tryGet(replica_path, replica_host_id);
|
||||||
|
if (replica_exists_in_zk)
|
||||||
{
|
{
|
||||||
if (replica_host_id == DROPPED_MARK && !is_create_query)
|
if (replica_host_id == DROPPED_MARK && !is_create_query)
|
||||||
{
|
{
|
||||||
@ -454,7 +455,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
|
|||||||
String host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
|
String host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
|
||||||
String host_id_default = getHostID(getContext(), db_uuid, false);
|
String host_id_default = getHostID(getContext(), db_uuid, false);
|
||||||
|
|
||||||
if (is_create_query || (replica_host_id != host_id && replica_host_id != host_id_default))
|
if (replica_host_id != host_id && replica_host_id != host_id_default)
|
||||||
{
|
{
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::REPLICA_ALREADY_EXISTS,
|
ErrorCodes::REPLICA_ALREADY_EXISTS,
|
||||||
@ -484,13 +485,20 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
|
|||||||
current_zookeeper->set(replica_path + "/replica_group", replica_group_name, -1);
|
current_zookeeper->set(replica_path + "/replica_group", replica_group_name, -1);
|
||||||
createEmptyLogEntry(current_zookeeper);
|
createEmptyLogEntry(current_zookeeper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Needed to mark all the queries
|
||||||
|
/// in the range (max log ptr at replica ZooKeeper nodes creation, max log ptr after replica recovery] as successful.
|
||||||
|
String max_log_ptr_at_creation_str;
|
||||||
|
if (current_zookeeper->tryGet(replica_path + "/max_log_ptr_at_creation", max_log_ptr_at_creation_str))
|
||||||
|
max_log_ptr_at_creation = parse<UInt32>(max_log_ptr_at_creation_str);
|
||||||
}
|
}
|
||||||
else if (is_create_query)
|
|
||||||
|
if (is_create_query)
|
||||||
{
|
{
|
||||||
/// Create new replica. Throws if replica with the same name already exists
|
/// Create replica nodes in ZooKeeper. If newly initialized nodes already exist, reuse them.
|
||||||
createReplicaNodesInZooKeeper(current_zookeeper);
|
createReplicaNodesInZooKeeper(current_zookeeper);
|
||||||
}
|
}
|
||||||
else
|
else if (!replica_exists_in_zk)
|
||||||
{
|
{
|
||||||
/// It's not CREATE query, but replica does not exist. Probably it was dropped.
|
/// It's not CREATE query, but replica does not exist. Probably it was dropped.
|
||||||
/// Do not create anything, continue as readonly.
|
/// Do not create anything, continue as readonly.
|
||||||
@ -606,37 +614,84 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
|
|||||||
"already contains some data and it does not look like Replicated database path.", zookeeper_path);
|
"already contains some data and it does not look like Replicated database path.", zookeeper_path);
|
||||||
|
|
||||||
/// Write host name to replica_path, it will protect from multiple replicas with the same name
|
/// Write host name to replica_path, it will protect from multiple replicas with the same name
|
||||||
auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
|
const auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
|
||||||
|
|
||||||
|
const std::vector<String> check_paths = {
|
||||||
|
replica_path,
|
||||||
|
replica_path + "/replica_group",
|
||||||
|
replica_path + "/digest",
|
||||||
|
};
|
||||||
|
bool nodes_exist = true;
|
||||||
|
auto check_responses = current_zookeeper->tryGet(check_paths);
|
||||||
|
for (size_t i = 0; i < check_responses.size(); ++i)
|
||||||
|
{
|
||||||
|
const auto response = check_responses[i];
|
||||||
|
|
||||||
|
if (response.error == Coordination::Error::ZNONODE)
|
||||||
|
{
|
||||||
|
nodes_exist = false;
|
||||||
|
break;
|
||||||
|
} else if (response.error != Coordination::Error::ZOK)
|
||||||
|
{
|
||||||
|
throw zkutil::KeeperException::fromPath(response.error, check_paths[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nodes_exist)
|
||||||
|
{
|
||||||
|
const std::vector<String> expected_data = {
|
||||||
|
host_id,
|
||||||
|
replica_group_name,
|
||||||
|
"0",
|
||||||
|
};
|
||||||
|
for (size_t i = 0; i != expected_data.size(); ++i)
|
||||||
|
{
|
||||||
|
if (check_responses[i].data != expected_data[i])
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::REPLICA_ALREADY_EXISTS,
|
||||||
|
"Replica node {} in ZooKeeper already exists and contains unexpected value: {}",
|
||||||
|
quoteString(check_paths[i]), quoteString(check_responses[i].data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_DEBUG(log, "Newly initialized replica nodes found in ZooKeeper, reusing them");
|
||||||
|
createEmptyLogEntry(current_zookeeper);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (int attempts = 10; attempts > 0; --attempts)
|
for (int attempts = 10; attempts > 0; --attempts)
|
||||||
{
|
{
|
||||||
Coordination::Stat stat;
|
Coordination::Stat stat;
|
||||||
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
|
const String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
|
||||||
|
|
||||||
Coordination::Requests ops;
|
const Coordination::Requests ops = {
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
|
zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent),
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
|
zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent),
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent));
|
zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent),
|
||||||
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent));
|
zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent),
|
||||||
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
|
|
||||||
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
|
/// Previously, this method was not idempotent and max_log_ptr_at_creation could be stored in memory.
|
||||||
/// notify other nodes that issued new queries while this node was recovering.
|
/// we need to store max_log_ptr_at_creation in ZooKeeper to make this method idempotent during replica creation.
|
||||||
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
|
zkutil::makeCreateRequest(replica_path + "/max_log_ptr_at_creation", max_log_ptr_str, zkutil::CreateMode::Persistent),
|
||||||
|
zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version),
|
||||||
|
};
|
||||||
|
|
||||||
|
Coordination::Responses ops_responses;
|
||||||
|
const auto code = current_zookeeper->tryMulti(ops, ops_responses);
|
||||||
|
|
||||||
Coordination::Responses responses;
|
|
||||||
const auto code = current_zookeeper->tryMulti(ops, responses);
|
|
||||||
if (code == Coordination::Error::ZOK)
|
if (code == Coordination::Error::ZOK)
|
||||||
{
|
{
|
||||||
max_log_ptr_at_creation = parse<UInt32>(max_log_ptr_str);
|
max_log_ptr_at_creation = parse<UInt32>(max_log_ptr_str);
|
||||||
break;
|
createEmptyLogEntry(current_zookeeper);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
else if (code == Coordination::Error::ZNODEEXISTS || attempts == 1)
|
|
||||||
|
if (attempts == 1)
|
||||||
{
|
{
|
||||||
/// If its our last attempt, or if the replica already exists, fail immediately.
|
zkutil::KeeperMultiException::check(code, ops, ops_responses);
|
||||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
createEmptyLogEntry(current_zookeeper);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr context_, LoadingStrictnessLevel mode)
|
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr context_, LoadingStrictnessLevel mode)
|
||||||
|
@ -228,8 +228,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
|
|
||||||
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
|
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
|
||||||
|
|
||||||
if (!create.attach && fs::exists(metadata_path))
|
if (!create.attach && fs::exists(metadata_path) && !fs::is_empty(metadata_path))
|
||||||
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string());
|
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists and is not empty", metadata_path.string());
|
||||||
}
|
}
|
||||||
else if (create.storage->engine->name == "MaterializeMySQL"
|
else if (create.storage->engine->name == "MaterializeMySQL"
|
||||||
|| create.storage->engine->name == "MaterializedMySQL")
|
|| create.storage->engine->name == "MaterializedMySQL")
|
||||||
@ -329,6 +329,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
writeChar('\n', statement_buf);
|
writeChar('\n', statement_buf);
|
||||||
String statement = statement_buf.str();
|
String statement = statement_buf.str();
|
||||||
|
|
||||||
|
/// Needed to make database creation retriable if it fails after the file is created
|
||||||
|
fs::remove(metadata_file_tmp_path);
|
||||||
|
|
||||||
/// Exclusive flag guarantees, that database is not created right now in another thread.
|
/// Exclusive flag guarantees, that database is not created right now in another thread.
|
||||||
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
|
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||||
writeString(statement, out);
|
writeString(statement, out);
|
||||||
@ -350,13 +353,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
DatabaseCatalog::instance().attachDatabase(database_name, database);
|
DatabaseCatalog::instance().attachDatabase(database_name, database);
|
||||||
added = true;
|
added = true;
|
||||||
|
|
||||||
if (need_write_metadata)
|
|
||||||
{
|
|
||||||
/// Prevents from overwriting metadata of detached database
|
|
||||||
renameNoReplace(metadata_file_tmp_path, metadata_file_path);
|
|
||||||
renamed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!load_database_without_tables)
|
if (!load_database_without_tables)
|
||||||
{
|
{
|
||||||
/// We use global context here, because storages lifetime is bigger than query context lifetime
|
/// We use global context here, because storages lifetime is bigger than query context lifetime
|
||||||
@ -368,6 +364,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
/// Only then prioritize, schedule and wait all the startup tasks
|
/// Only then prioritize, schedule and wait all the startup tasks
|
||||||
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_tasks);
|
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (need_write_metadata)
|
||||||
|
{
|
||||||
|
/// Prevents from overwriting metadata of detached database
|
||||||
|
renameNoReplace(metadata_file_tmp_path, metadata_file_path);
|
||||||
|
renamed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user