diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index cb205d6d63a..21e7eb05d34 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -89,6 +89,7 @@ DatabaseReplicated::DatabaseReplicated( , shard_name(shard_name_) , replica_name(replica_name_) , db_settings(std::move(db_settings_)) + , tables_metadata_digest(0) { if (zookeeper_path.empty() || shard_name.empty() || replica_name.empty()) throw Exception("ZooKeeper path, shard and replica names must be non-empty", ErrorCodes::BAD_ARGUMENTS); @@ -450,8 +451,8 @@ void DatabaseReplicated::startupTables(ThreadPool & thread_pool, LoadingStrictne digest += getMetadataHash(table.first); LOG_DEBUG(log, "Calculated metadata digest of {} tables: {}", TSA_SUPPRESS_WARNING_FOR_READ(tables).size(), digest); - chassert(!tables_metadata_digest); - tables_metadata_digest = digest; + chassert(!TSA_SUPPRESS_WARNING_FOR_READ(tables_metadata_digest)); + TSA_SUPPRESS_WARNING_FOR_WRITE(tables_metadata_digest) = digest; ddl_worker = std::make_unique(this, getContext()); if (is_probably_dropped) @@ -459,11 +460,14 @@ void DatabaseReplicated::startupTables(ThreadPool & thread_pool, LoadingStrictne ddl_worker->startup(); } -bool DatabaseReplicated::debugCheckDigest(const ContextPtr & local_context) const +bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const { - /// Reduce number of debug checks - //if (thread_local_rng() % 16) - // return true; + if (debug_check) + { + /// Reduce number of debug checks + if (thread_local_rng() % 16) + return true; + } LOG_TEST(log, "Current in-memory metadata digest: {}", tables_metadata_digest); @@ -773,7 +777,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep new_digest -= getMetadataHash(broken_table_name); DatabaseAtomic::renameTable(make_query_context(), broken_table_name, *to_db_ptr, to_name, /* exchange */ false, /* dictionary */ false); tables_metadata_digest = new_digest; - assert(debugCheckDigest(getContext())); + assert(checkDigestValid(getContext())); ++moved_tables; }; @@ -799,7 +803,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep new_digest -= getMetadataHash(table_name); DatabaseAtomic::dropTableImpl(make_query_context(), table_name, /* sync */ true); tables_metadata_digest = new_digest; - assert(debugCheckDigest(getContext())); + assert(checkDigestValid(getContext())); } else if (!table->supportsReplication()) { @@ -833,7 +837,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep new_digest += DB::getMetadataHash(to, statement); DatabaseAtomic::renameTable(make_query_context(), from, *this, to, false, false); tables_metadata_digest = new_digest; - assert(debugCheckDigest(getContext())); + assert(checkDigestValid(getContext())); } for (const auto & id : dropped_tables) @@ -870,7 +874,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep } std::lock_guard lock{metadata_mutex}; - chassert(debugCheckDigest(getContext())); + chassert(checkDigestValid(getContext())); current_zookeeper->set(replica_path + "/digest", toString(tables_metadata_digest)); } @@ -1001,7 +1005,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl DatabaseAtomic::dropTableImpl(local_context, table_name, sync); tables_metadata_digest = new_digest; - assert(debugCheckDigest(local_context)); + assert(checkDigestValid(local_context)); } void DatabaseReplicated::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database, @@ -1054,7 +1058,7 @@ void DatabaseReplicated::renameTable(ContextPtr local_context, const String & ta DatabaseAtomic::renameTable(local_context, table_name, to_database, to_table_name, exchange, dictionary); tables_metadata_digest = new_digest; - assert(debugCheckDigest(local_context)); + assert(checkDigestValid(local_context)); } void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table, @@ -1080,7 +1084,7 @@ void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const S DatabaseAtomic::commitCreateTable(query, table, table_metadata_tmp_path, table_metadata_path, query_context); tables_metadata_digest = new_digest; - assert(debugCheckDigest(query_context)); + assert(checkDigestValid(query_context)); } void DatabaseReplicated::commitAlterTable(const StorageID & table_id, @@ -1104,7 +1108,7 @@ void DatabaseReplicated::commitAlterTable(const StorageID & table_id, DatabaseAtomic::commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, query_context); tables_metadata_digest = new_digest; - assert(debugCheckDigest(query_context)); + assert(checkDigestValid(query_context)); } void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name) @@ -1127,7 +1131,7 @@ void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const DatabaseAtomic::detachTablePermanently(local_context, table_name); tables_metadata_digest = new_digest; - assert(debugCheckDigest(local_context)); + assert(checkDigestValid(local_context)); } void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, const String & table_name, const String & table_metadata_path, bool attach) @@ -1152,7 +1156,7 @@ void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, DatabaseAtomic::removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, attach); tables_metadata_digest = new_digest; - assert(debugCheckDigest(local_context)); + assert(checkDigestValid(local_context)); } diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index 8256d54a796..56689ed94bf 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -112,7 +112,7 @@ private: } UInt64 getMetadataHash(const String & table_name) const; - bool debugCheckDigest(const ContextPtr & local_context) const; + bool checkDigestValid(const ContextPtr & local_context, bool debug_check = true) const TSA_REQUIRES(metadata_mutex); String zookeeper_path; String shard_name; @@ -131,7 +131,11 @@ private: /// Usually operation with metadata are single-threaded because of the way replication works, /// but StorageReplicatedMergeTree may call alterTable outside from DatabaseReplicatedDDLWorker causing race conditions. std::mutex metadata_mutex; - UInt64 tables_metadata_digest = 0; + + /// Sum of hashes of pairs (table_name, table_create_statement). + /// We calculate this sum from local metadata files and compare it will value in ZooKeeper. + /// It allows to detect if metadata is broken and recover replica. + UInt64 tables_metadata_digest TSA_GUARDED_BY(metadata_mutex); mutable ClusterPtr cluster; }; diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 161e262a574..63d5af8da3d 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -67,23 +67,33 @@ void DatabaseReplicatedDDLWorker::initializeReplication() UInt32 max_log_ptr = parse(zookeeper->get(database->zookeeper_path + "/max_log_ptr")); logs_to_keep = parse(zookeeper->get(database->zookeeper_path + "/logs_to_keep")); - String digest; - if (!zookeeper->tryGet(database->replica_path + "/digest", digest)) + UInt64 digest; + String digest_str; + UInt64 local_digest; + if (zookeeper->tryGet(database->replica_path + "/digest", digest_str)) + { + digest = parse(digest_str); + std::lock_guard lock{database->metadata_mutex}; + local_digest = database->tables_metadata_digest; + } + else { /// Database was created by old ClickHouse versions, let's create the node - digest = toString(database->tables_metadata_digest); - zookeeper->create(database->replica_path + "/digest", digest, zkutil::CreateMode::Persistent); + std::lock_guard lock{database->metadata_mutex}; + digest = local_digest = database->tables_metadata_digest; + digest_str = toString(digest); + zookeeper->create(database->replica_path + "/digest", digest_str, zkutil::CreateMode::Persistent); } bool is_new_replica = our_log_ptr == 0; bool lost_according_to_log_ptr = our_log_ptr + logs_to_keep < max_log_ptr; - bool lost_according_to_digest = database->db_settings.check_consistency && database->tables_metadata_digest != parse(digest); + bool lost_according_to_digest = database->db_settings.check_consistency && local_digest != digest; if (is_new_replica || lost_according_to_log_ptr || lost_according_to_digest) { if (!is_new_replica) LOG_WARNING(log, "Replica seems to be lost: our_log_ptr={}, max_log_ptr={}, local_digest={}, zk_digest={}", - our_log_ptr, max_log_ptr, database->tables_metadata_digest, digest); + our_log_ptr, max_log_ptr, local_digest, digest); database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr); zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr)); initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr)); @@ -94,6 +104,10 @@ void DatabaseReplicatedDDLWorker::initializeReplication() last_skipped_entry_name.emplace(log_entry_name); initializeLogPointer(log_entry_name); } + + std::lock_guard lock{database->metadata_mutex}; + if (!database->checkDigestValid(context)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper"); } String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) diff --git a/src/Databases/LoadingStrictnessLevel.h b/src/Databases/LoadingStrictnessLevel.h index 4c566d3ac72..b6449a0a9fd 100644 --- a/src/Databases/LoadingStrictnessLevel.h +++ b/src/Databases/LoadingStrictnessLevel.h @@ -3,11 +3,16 @@ namespace DB { +/// Strictness mode for loading a table or database enum class LoadingStrictnessLevel { + /// Do all possible sanity checks CREATE = 0, + /// Expect existing paths on FS and in ZK for ATTACH query ATTACH = 1, + /// We ignore some error on server startup FORCE_ATTACH = 2, + /// Skip all sanity checks (if force_restore_data flag exists) FORCE_RESTORE = 3, };