From 760297908e9fd5ff65af2d781fb6295d20cbce95 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 15 Nov 2024 13:11:56 +0000 Subject: [PATCH] Backport #69274 to 24.3: fix `metadata_version` in ZooKeeper --- .../ReplicatedMergeTreeAttachThread.cpp | 68 ++++++++++++++++++- .../ReplicatedMergeTreeAttachThread.h | 2 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 ++ .../MergeTree/ReplicatedMergeTreeQueue.h | 1 + 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 336d19692d4..0a28ac980cf 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -14,6 +14,7 @@ namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; extern const int REPLICA_STATUS_CHANGED; + extern const int LOGICAL_ERROR; } ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_) @@ -116,6 +117,67 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z } } +Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper) +{ + const String & zookeeper_path = storage.zookeeper_path; + const String & replica_path = storage.replica_path; + const bool replica_readonly = storage.is_readonly; + + for (size_t i = 0; i != 2; ++i) + { + String replica_metadata_version_str; + const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str); + if (!replica_metadata_version_exists) + return -1; + + const Int32 metadata_version = parse(replica_metadata_version_str); + + if (metadata_version != 0 || replica_readonly) + { + /// No need to fix anything + return metadata_version; + } + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + ReplicatedMergeTreeQueue & queue = storage.queue; + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version); + return stat.version; + } + if (code != Coordination::Error::ZBADVERSION) + { + throw zkutil::KeeperException(code); + } + } + + /// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt. + /// If metadata_version != 0, on second attempt we will return the new metadata_version. + /// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0. + /// Either way, on second attempt this method should return. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts"); +} + void ReplicatedMergeTreeAttachThread::runImpl() { storage.setZooKeeper(); @@ -159,11 +221,11 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - String replica_metadata_version; - const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version); + const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper); + const bool replica_metadata_version_exists = replica_metadata_version != -1; if (replica_metadata_version_exists) { - storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse(replica_metadata_version))); + storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version)); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index 250a5ed34d1..bfc97442598 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -48,6 +48,8 @@ private: void runImpl(); void finalizeInitialization(); + + Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper); }; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 0c9d4cfe9ef..418544a03b6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2099,6 +2099,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2141,6 +2142,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 60b1a08912b..541cd633b38 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -451,6 +451,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time;