diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 8e3378bcc12..26a834eea4c 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -441,7 +441,8 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL bool is_create_query = mode == LoadingStrictnessLevel::CREATE; String replica_host_id; - if (current_zookeeper->tryGet(replica_path, replica_host_id)) + bool replica_exists_in_zk = current_zookeeper->tryGet(replica_path, replica_host_id); + if (replica_exists_in_zk) { if (replica_host_id == DROPPED_MARK && !is_create_query) { @@ -454,7 +455,7 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL String host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection); String host_id_default = getHostID(getContext(), db_uuid, false); - if (is_create_query || (replica_host_id != host_id && replica_host_id != host_id_default)) + if (replica_host_id != host_id && replica_host_id != host_id_default) { throw Exception( ErrorCodes::REPLICA_ALREADY_EXISTS, @@ -485,12 +486,13 @@ void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessL createEmptyLogEntry(current_zookeeper); } } - else if (is_create_query) + + if (is_create_query) { - /// Create new replica. Throws if replica with the same name already exists + /// Create replica nodes in ZooKeeper. If newly initialized nodes already exist, reuse them. createReplicaNodesInZooKeeper(current_zookeeper); } - else + else if (!replica_exists_in_zk) { /// It's not CREATE query, but replica does not exist. Probably it was dropped. /// Do not create anything, continue as readonly. @@ -606,37 +608,92 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt "already contains some data and it does not look like Replicated database path.", zookeeper_path); /// Write host name to replica_path, it will protect from multiple replicas with the same name - auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection); + const auto host_id = getHostID(getContext(), db_uuid, cluster_auth_info.cluster_secure_connection); + + const std::vector check_paths = { + replica_path, + replica_path + "/replica_group", + replica_path + "/digest", + + /// Needed to mark all the queries + /// in the range (max log ptr at replica ZooKeeper nodes creation, max log ptr after replica recovery] as successful. + /// Previously, this method was not idempotent and max_log_ptr_at_creation could be stored in memory. + /// we need to store max_log_ptr_at_creation in ZooKeeper to make this method idempotent during replica creation. + replica_path + "/max_log_ptr_at_creation", + }; + bool nodes_exist = true; + auto check_responses = current_zookeeper->tryGet(check_paths); + for (size_t i = 0; i < check_responses.size(); ++i) + { + const auto response = check_responses[i]; + + if (response.error == Coordination::Error::ZNONODE) + { + nodes_exist = false; + break; + } else if (response.error != Coordination::Error::ZOK) + { + throw zkutil::KeeperException::fromPath(response.error, check_paths[i]); + } + } + + if (nodes_exist) + { + const std::vector expected_data = { + host_id, + replica_group_name, + "0", + }; + for (size_t i = 0; i != expected_data.size(); ++i) + { + if (check_responses[i].data != expected_data[i]) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica node {} in ZooKeeper already exists and contains unexpected value: {}", + quoteString(check_paths[i]), quoteString(check_responses[i].data)); + } + } + + LOG_DEBUG(log, "Newly initialized replica nodes found in ZooKeeper, reusing them"); + max_log_ptr_at_creation = parse(check_responses[check_responses.size() - 1].data); + createEmptyLogEntry(current_zookeeper); + return; + } for (int attempts = 10; attempts > 0; --attempts) { Coordination::Stat stat; - String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat); + const String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat); - Coordination::Requests ops; - ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent)); - /// In addition to creating the replica nodes, we record the max_log_ptr at the instant where - /// we declared ourself as an existing replica. We'll need this during recoverLostReplica to - /// notify other nodes that issued new queries while this node was recovering. - ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version)); + const Coordination::Requests ops = { + zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(replica_path + "/digest", "0", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest(replica_path + "/replica_group", replica_group_name, zkutil::CreateMode::Persistent), + + /// In addition to creating the replica nodes, we record the max_log_ptr at the instant where + /// we declared ourself as an existing replica. We'll need this during recoverLostReplica to + /// notify other nodes that issued new queries while this node was recovering. + zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version), + zkutil::makeCreateRequest(replica_path + "/max_log_ptr_at_creation", max_log_ptr_str, zkutil::CreateMode::Persistent), + }; + + Coordination::Responses ops_responses; + const auto code = current_zookeeper->tryMulti(ops, ops_responses); - Coordination::Responses responses; - const auto code = current_zookeeper->tryMulti(ops, responses); if (code == Coordination::Error::ZOK) { max_log_ptr_at_creation = parse(max_log_ptr_str); - break; + createEmptyLogEntry(current_zookeeper); + return; } - else if (code == Coordination::Error::ZNODEEXISTS || attempts == 1) + + if (attempts == 1) { - /// If its our last attempt, or if the replica already exists, fail immediately. - zkutil::KeeperMultiException::check(code, ops, responses); + zkutil::KeeperMultiException::check(code, ops, ops_responses); } } - createEmptyLogEntry(current_zookeeper); } void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr context_, LoadingStrictnessLevel mode) diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 80cb0510b35..a1ed9f40685 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -228,8 +228,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid); - if (!create.attach && fs::exists(metadata_path)) - throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string()); + if (!create.attach && fs::exists(metadata_path) && !fs::is_empty(metadata_path)) + throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists and is not empty", metadata_path.string()); } else if (create.storage->engine->name == "MaterializeMySQL" || create.storage->engine->name == "MaterializedMySQL") @@ -329,6 +329,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) writeChar('\n', statement_buf); String statement = statement_buf.str(); + /// Needed to make database creation retriable if it fails after the file is created + fs::remove(metadata_file_tmp_path); + /// Exclusive flag guarantees, that database is not created right now in another thread. WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL); writeString(statement, out); @@ -350,13 +353,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) DatabaseCatalog::instance().attachDatabase(database_name, database); added = true; - if (need_write_metadata) - { - /// Prevents from overwriting metadata of detached database - renameNoReplace(metadata_file_tmp_path, metadata_file_path); - renamed = true; - } - if (!load_database_without_tables) { /// We use global context here, because storages lifetime is bigger than query context lifetime @@ -368,6 +364,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) /// Only then prioritize, schedule and wait all the startup tasks waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_tasks); } + + if (need_write_metadata) + { + /// Prevents from overwriting metadata of detached database + renameNoReplace(metadata_file_tmp_path, metadata_file_path); + renamed = true; + } } catch (...) { diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 60a6e099b22..58e99d4c408 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -1,5 +1,8 @@ +import concurrent.futures import os +import random import shutil +import string import time import re import pytest @@ -1549,3 +1552,35 @@ def test_all_groups_cluster(started_cluster): assert "bad_settings_node\ndummy_node\n" == bad_settings_node.query( "select host_name from system.clusters where name='all_groups.db_cluster' order by host_name" ) + + +def test_database_creation_idempotency(started_cluster): + def randomize_database_name(database_name, random_suffix_length=20): + letters = string.ascii_letters + string.digits + return f"{database_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}" + + databases = [randomize_database_name("rdb") for _ in range(100)] + + def create_database(name): + main_node.query( + f""" + CREATE DATABASE {name} + ENGINE = Replicated('/test/test_database_creation_idempotency/{name}', 'shard', 'replica') + """ + ) + + with concurrent.futures.ThreadPoolExecutor() as executor: + for name in databases: + executor.submit(create_database, name) + + main_node.restart_clickhouse(kill=True) + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(create_database, name) for name in databases] + concurrent.futures.wait(futures) + + assert int( + main_node.query( + f"SELECT count() FROM system.databases WHERE name IN {databases}" + ).strip() + ) == len(databases)