make replicated database creation operations idempotent

This commit is contained in:
Michael Stetsyuk 2024-08-30 14:34:24 +00:00
parent 47f0b9fbed
commit ecfe9d9782
3 changed files with 127 additions and 32 deletions

View File

@ -441,7 +441,8 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
bool is_create_query = mode == LoadingStrictnessLevel::CREATE;
String replica_host_id;
if (current_zookeeper->tryGet(replica_path, replica_host_id))
bool replica_exists_in_zk = current_zookeeper->tryGet(replica_path, replica_host_id);
if (replica_exists_in_zk)
{
if (replica_host_id == DROPPED_MARK && !is_create_query)
{
@ -454,7 +455,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
String host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
String host_id_default = getHostID(getContext(), db_uuid, false);
if (is_create_query || (replica_host_id != host_id && replica_host_id != host_id_default))
if (replica_host_id != host_id && replica_host_id != host_id_default)
{
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
@ -485,12 +486,13 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL
createEmptyLogEntry(current_zookeeper);
}
}
else if (is_create_query)
if (is_create_query)
{
/// Create new replica. Throws if replica with the same name already exists
/// Create replica nodes in ZooKeeper. If newly initialized nodes already exist, reuse them.
createReplicaNodesInZooKeeper(current_zookeeper);
}
else
else if (!replica_exists_in_zk)
{
/// It's not CREATE query, but replica does not exist. Probably it was dropped.
/// Do not create anything, continue as readonly.
@ -606,37 +608,92 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
"already contains some data and it does not look like Replicated database path.", zookeeper_path);
/// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
const auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection);
const std::vector<String> check_paths = {
replica_path,
replica_path + "/replica_group",
replica_path + "/digest",
/// Needed to mark all the queries
/// in the range (max log ptr at replica ZooKeeper nodes creation, max log ptr after replica recovery] as successful.
/// Previously, this method was not idempotent and max_log_ptr_at_creation could be stored in memory.
/// we need to store max_log_ptr_at_creation in ZooKeeper to make this method idempotent during replica creation.
replica_path + "/max_log_ptr_at_creation",
};
bool nodes_exist = true;
auto check_responses = current_zookeeper->tryGet(check_paths);
for (size_t i = 0; i < check_responses.size(); ++i)
{
const auto response = check_responses[i];
if (response.error == Coordination::Error::ZNONODE)
{
nodes_exist = false;
break;
} else if (response.error != Coordination::Error::ZOK)
{
throw zkutil::KeeperException::fromPath(response.error, check_paths[i]);
}
}
if (nodes_exist)
{
const std::vector<String> expected_data = {
host_id,
replica_group_name,
"0",
};
for (size_t i = 0; i != expected_data.size(); ++i)
{
if (check_responses[i].data != expected_data[i])
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica node {} in ZooKeeper already exists and contains unexpected value: {}",
quoteString(check_paths[i]), quoteString(check_responses[i].data));
}
}
LOG_DEBUG(log, "Newly initialized replica nodes found in ZooKeeper, reusing them");
max_log_ptr_at_creation = parse<UInt32>(check_responses[check_responses.size() - 1].data);
createEmptyLogEntry(current_zookeeper);
return;
}
for (int attempts = 10; attempts > 0; --attempts)
{
Coordination::Stat stat;
String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
const String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
const Coordination::Requests ops = {
zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent),
zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent),
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent));
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version),
zkutil::makeCreateRequest(replica_path + "/max_log_ptr_at_creation", max_log_ptr_str, zkutil::CreateMode::Persistent),
};
Coordination::Responses ops_responses;
const auto code = current_zookeeper->tryMulti(ops, ops_responses);
Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
{
max_log_ptr_at_creation = parse<UInt32>(max_log_ptr_str);
break;
}
else if (code == Coordination::Error::ZNODEEXISTS || attempts == 1)
{
/// If its our last attempt, or if the replica already exists, fail immediately.
zkutil::KeeperMultiException::check(code, ops, responses);
}
}
createEmptyLogEntry(current_zookeeper);
return;
}
if (attempts == 1)
{
zkutil::KeeperMultiException::check(code, ops, ops_responses);
}
}
}
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr context_, LoadingStrictnessLevel mode)

View File

@ -228,8 +228,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
if (!create.attach && fs::exists(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string());
if (!create.attach && fs::exists(metadata_path) && !fs::is_empty(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists and is not empty", metadata_path.string());
}
else if (create.storage->engine->name == "MaterializeMySQL"
|| create.storage->engine->name == "MaterializedMySQL")
@ -329,6 +329,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
writeChar('\n', statement_buf);
String statement = statement_buf.str();
/// Needed to make database creation retriable if it fails after the file is created
fs::remove(metadata_file_tmp_path);
/// Exclusive flag guarantees, that database is not created right now in another thread.
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
@ -350,13 +353,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
DatabaseCatalog::instance().attachDatabase(database_name, database);
added = true;
if (need_write_metadata)
{
/// Prevents from overwriting metadata of detached database
renameNoReplace(metadata_file_tmp_path, metadata_file_path);
renamed = true;
}
if (!load_database_without_tables)
{
/// We use global context here, because storages lifetime is bigger than query context lifetime
@ -368,6 +364,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
/// Only then prioritize, schedule and wait all the startup tasks
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_tasks);
}
if (need_write_metadata)
{
/// Prevents from overwriting metadata of detached database
renameNoReplace(metadata_file_tmp_path, metadata_file_path);
renamed = true;
}
}
catch (...)
{

View File

@ -1,5 +1,8 @@
import concurrent.futures
import os
import random
import shutil
import string
import time
import re
import pytest
@ -1549,3 +1552,35 @@ def test_all_groups_cluster(started_cluster):
assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query(
"select host_name from system.clusters where name='all_groups.db_cluster' order by host_name"
)
def test_database_creation_idempotency(started_cluster):
def randomize_database_name(database_name, random_suffix_length=20):
letters = string.ascii_letters + string.digits
return f"{database_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
databases = [randomize_database_name("rdb") for _ in range(100)]
def create_database(name):
main_node.query(
f"""
CREATE DATABASE {name}
ENGINE = Replicated('/test/test_database_creation_idempotency/{name}', 'shard', 'replica')
"""
)
with concurrent.futures.ThreadPoolExecutor() as executor:
for name in databases:
executor.submit(create_database, name)
main_node.restart_clickhouse(kill=True)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(create_database, name) for name in databases]
concurrent.futures.wait(futures)
assert int(
main_node.query(
f"SELECT count() FROM system.databases WHERE name IN {databases}"
).strip()
) == len(databases)