mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
fix
This commit is contained in:
parent
1a059035b8
commit
624ccd5206
@ -89,6 +89,7 @@ DatabaseReplicated::DatabaseReplicated(
|
||||
, shard_name(shard_name_)
|
||||
, replica_name(replica_name_)
|
||||
, db_settings(std::move(db_settings_))
|
||||
, tables_metadata_digest(0)
|
||||
{
|
||||
if (zookeeper_path.empty() || shard_name.empty() || replica_name.empty())
|
||||
throw Exception("ZooKeeper path, shard and replica names must be non-empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
@ -450,8 +451,8 @@ void DatabaseReplicated::startupTables(ThreadPool & thread_pool, LoadingStrictne
|
||||
digest += getMetadataHash(table.first);
|
||||
|
||||
LOG_DEBUG(log, "Calculated metadata digest of {} tables: {}", TSA_SUPPRESS_WARNING_FOR_READ(tables).size(), digest);
|
||||
chassert(!tables_metadata_digest);
|
||||
tables_metadata_digest = digest;
|
||||
chassert(!TSA_SUPPRESS_WARNING_FOR_READ(tables_metadata_digest));
|
||||
TSA_SUPPRESS_WARNING_FOR_WRITE(tables_metadata_digest) = digest;
|
||||
|
||||
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
|
||||
if (is_probably_dropped)
|
||||
@ -459,11 +460,14 @@ void DatabaseReplicated::startupTables(ThreadPool & thread_pool, LoadingStrictne
|
||||
ddl_worker->startup();
|
||||
}
|
||||
|
||||
bool DatabaseReplicated::debugCheckDigest(const ContextPtr & local_context) const
|
||||
bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const
|
||||
{
|
||||
/// Reduce number of debug checks
|
||||
//if (thread_local_rng() % 16)
|
||||
// return true;
|
||||
if (debug_check)
|
||||
{
|
||||
/// Reduce number of debug checks
|
||||
if (thread_local_rng() % 16)
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Current in-memory metadata digest: {}", tables_metadata_digest);
|
||||
|
||||
@ -773,7 +777,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
new_digest -= getMetadataHash(broken_table_name);
|
||||
DatabaseAtomic::renameTable(make_query_context(), broken_table_name, *to_db_ptr, to_name, /* exchange */ false, /* dictionary */ false);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(getContext()));
|
||||
assert(checkDigestValid(getContext()));
|
||||
++moved_tables;
|
||||
};
|
||||
|
||||
@ -799,7 +803,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
new_digest -= getMetadataHash(table_name);
|
||||
DatabaseAtomic::dropTableImpl(make_query_context(), table_name, /* sync */ true);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(getContext()));
|
||||
assert(checkDigestValid(getContext()));
|
||||
}
|
||||
else if (!table->supportsReplication())
|
||||
{
|
||||
@ -833,7 +837,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
new_digest += DB::getMetadataHash(to, statement);
|
||||
DatabaseAtomic::renameTable(make_query_context(), from, *this, to, false, false);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(getContext()));
|
||||
assert(checkDigestValid(getContext()));
|
||||
}
|
||||
|
||||
for (const auto & id : dropped_tables)
|
||||
@ -870,7 +874,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
}
|
||||
|
||||
std::lock_guard lock{metadata_mutex};
|
||||
chassert(debugCheckDigest(getContext()));
|
||||
chassert(checkDigestValid(getContext()));
|
||||
current_zookeeper->set(replica_path + "/digest", toString(tables_metadata_digest));
|
||||
}
|
||||
|
||||
@ -1001,7 +1005,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl
|
||||
DatabaseAtomic::dropTableImpl(local_context, table_name, sync);
|
||||
tables_metadata_digest = new_digest;
|
||||
|
||||
assert(debugCheckDigest(local_context));
|
||||
assert(checkDigestValid(local_context));
|
||||
}
|
||||
|
||||
void DatabaseReplicated::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,
|
||||
@ -1054,7 +1058,7 @@ void DatabaseReplicated::renameTable(ContextPtr local_context, const String & ta
|
||||
|
||||
DatabaseAtomic::renameTable(local_context, table_name, to_database, to_table_name, exchange, dictionary);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(local_context));
|
||||
assert(checkDigestValid(local_context));
|
||||
}
|
||||
|
||||
void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
|
||||
@ -1080,7 +1084,7 @@ void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const S
|
||||
|
||||
DatabaseAtomic::commitCreateTable(query, table, table_metadata_tmp_path, table_metadata_path, query_context);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(query_context));
|
||||
assert(checkDigestValid(query_context));
|
||||
}
|
||||
|
||||
void DatabaseReplicated::commitAlterTable(const StorageID & table_id,
|
||||
@ -1104,7 +1108,7 @@ void DatabaseReplicated::commitAlterTable(const StorageID & table_id,
|
||||
|
||||
DatabaseAtomic::commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, query_context);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(query_context));
|
||||
assert(checkDigestValid(query_context));
|
||||
}
|
||||
|
||||
void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name)
|
||||
@ -1127,7 +1131,7 @@ void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const
|
||||
|
||||
DatabaseAtomic::detachTablePermanently(local_context, table_name);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(local_context));
|
||||
assert(checkDigestValid(local_context));
|
||||
}
|
||||
|
||||
void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, const String & table_name, const String & table_metadata_path, bool attach)
|
||||
@ -1152,7 +1156,7 @@ void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context,
|
||||
|
||||
DatabaseAtomic::removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, attach);
|
||||
tables_metadata_digest = new_digest;
|
||||
assert(debugCheckDigest(local_context));
|
||||
assert(checkDigestValid(local_context));
|
||||
}
|
||||
|
||||
|
||||
|
@ -112,7 +112,7 @@ private:
|
||||
}
|
||||
|
||||
UInt64 getMetadataHash(const String & table_name) const;
|
||||
bool debugCheckDigest(const ContextPtr & local_context) const;
|
||||
bool checkDigestValid(const ContextPtr & local_context, bool debug_check = true) const TSA_REQUIRES(metadata_mutex);
|
||||
|
||||
String zookeeper_path;
|
||||
String shard_name;
|
||||
@ -131,7 +131,11 @@ private:
|
||||
/// Usually operation with metadata are single-threaded because of the way replication works,
|
||||
/// but StorageReplicatedMergeTree may call alterTable outside from DatabaseReplicatedDDLWorker causing race conditions.
|
||||
std::mutex metadata_mutex;
|
||||
UInt64 tables_metadata_digest = 0;
|
||||
|
||||
/// Sum of hashes of pairs (table_name, table_create_statement).
|
||||
/// We calculate this sum from local metadata files and compare it will value in ZooKeeper.
|
||||
/// It allows to detect if metadata is broken and recover replica.
|
||||
UInt64 tables_metadata_digest TSA_GUARDED_BY(metadata_mutex);
|
||||
|
||||
mutable ClusterPtr cluster;
|
||||
};
|
||||
|
@ -67,23 +67,33 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
|
||||
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
|
||||
logs_to_keep = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
|
||||
|
||||
String digest;
|
||||
if (!zookeeper->tryGet(database->replica_path + "/digest", digest))
|
||||
UInt64 digest;
|
||||
String digest_str;
|
||||
UInt64 local_digest;
|
||||
if (zookeeper->tryGet(database->replica_path + "/digest", digest_str))
|
||||
{
|
||||
digest = parse<UInt64>(digest_str);
|
||||
std::lock_guard lock{database->metadata_mutex};
|
||||
local_digest = database->tables_metadata_digest;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Database was created by old ClickHouse versions, let's create the node
|
||||
digest = toString(database->tables_metadata_digest);
|
||||
zookeeper->create(database->replica_path + "/digest", digest, zkutil::CreateMode::Persistent);
|
||||
std::lock_guard lock{database->metadata_mutex};
|
||||
digest = local_digest = database->tables_metadata_digest;
|
||||
digest_str = toString(digest);
|
||||
zookeeper->create(database->replica_path + "/digest", digest_str, zkutil::CreateMode::Persistent);
|
||||
}
|
||||
|
||||
bool is_new_replica = our_log_ptr == 0;
|
||||
bool lost_according_to_log_ptr = our_log_ptr + logs_to_keep < max_log_ptr;
|
||||
bool lost_according_to_digest = database->db_settings.check_consistency && database->tables_metadata_digest != parse<UInt64>(digest);
|
||||
bool lost_according_to_digest = database->db_settings.check_consistency && local_digest != digest;
|
||||
|
||||
if (is_new_replica || lost_according_to_log_ptr || lost_according_to_digest)
|
||||
{
|
||||
if (!is_new_replica)
|
||||
LOG_WARNING(log, "Replica seems to be lost: our_log_ptr={}, max_log_ptr={}, local_digest={}, zk_digest={}",
|
||||
our_log_ptr, max_log_ptr, database->tables_metadata_digest, digest);
|
||||
our_log_ptr, max_log_ptr, local_digest, digest);
|
||||
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
|
||||
zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr));
|
||||
initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
|
||||
@ -94,6 +104,10 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
|
||||
last_skipped_entry_name.emplace(log_entry_name);
|
||||
initializeLogPointer(log_entry_name);
|
||||
}
|
||||
|
||||
std::lock_guard lock{database->metadata_mutex};
|
||||
if (!database->checkDigestValid(context))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent database metadata after reconnection to ZooKeeper");
|
||||
}
|
||||
|
||||
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
|
||||
|
@ -3,11 +3,16 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Strictness mode for loading a table or database
|
||||
enum class LoadingStrictnessLevel
|
||||
{
|
||||
/// Do all possible sanity checks
|
||||
CREATE = 0,
|
||||
/// Expect existing paths on FS and in ZK for ATTACH query
|
||||
ATTACH = 1,
|
||||
/// We ignore some error on server startup
|
||||
FORCE_ATTACH = 2,
|
||||
/// Skip all sanity checks (if force_restore_data flag exists)
|
||||
FORCE_RESTORE = 3,
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user