diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 627bda3f8bf..9004f986c5e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2128,6 +2128,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2170,6 +2171,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 89ef6240558..2011d84eefe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -453,6 +453,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff8e362aa36..7167afa7fd3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6155,6 +6155,8 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); + // update metadata's metadata_version + // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6200,7 +6202,8 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); + Int32 metadata_version = fixMetadataVersionInZooKeeper(); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10641,4 +10644,49 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; +Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() +{ + const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = current_zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully fixed metadata_version"); + return stat.version; + } + if (code == Coordination::Error::ZBADVERSION) + { + LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); + return metadata_version; + } + throw zkutil::KeeperException(code); +} + } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..e591a800ea2 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1013,6 +1013,7 @@ private: DataPartsVector::const_iterator it; }; + Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);