From 32cfdc98b207ca89c1051fc0141d5a2231f9d5e9 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Wed, 4 Sep 2024 18:00:03 +0000 Subject: [PATCH 1/4] fix metadata_version in keeper --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 6 +++ .../MergeTree/ReplicatedMergeTreeQueue.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 50 ++++++++++++++++++- src/Storages/StorageReplicatedMergeTree.h | 1 + 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 627bda3f8bf..9004f986c5e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2128,6 +2128,7 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.inserts_in_queue = 0; res.merges_in_queue = 0; res.part_mutations_in_queue = 0; + res.metadata_alters_in_queue = 0; res.queue_oldest_time = 0; res.inserts_oldest_time = 0; res.merges_oldest_time = 0; @@ -2170,6 +2171,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const res.oldest_part_to_mutate_to = entry->new_part_name; } } + + if (entry->type == LogEntry::ALTER_METADATA) + { + ++res.metadata_alters_in_queue; + } } return res; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 89ef6240558..2011d84eefe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -453,6 +453,7 @@ public: UInt32 inserts_in_queue; UInt32 merges_in_queue; UInt32 part_mutations_in_queue; + UInt32 metadata_alters_in_queue; UInt32 queue_oldest_time; UInt32 inserts_oldest_time; UInt32 merges_oldest_time; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff8e362aa36..7167afa7fd3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6155,6 +6155,8 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); + // update metadata's metadata_version + // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6200,7 +6202,8 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); + Int32 metadata_version = fixMetadataVersionInZooKeeper(); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10641,4 +10644,49 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; +Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() +{ + const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = current_zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully fixed metadata_version"); + return stat.version; + } + if (code == Coordination::Error::ZBADVERSION) + { + LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); + return metadata_version; + } + throw zkutil::KeeperException(code); +} + } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..e591a800ea2 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1013,6 +1013,7 @@ private: DataPartsVector::const_iterator it; }; + Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From be55e1d2e166c033aaa369970d3c5b21cfda5807 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Thu, 5 Sep 2024 18:38:59 +0000 Subject: [PATCH 2/4] better --- .../ReplicatedMergeTreeAttachThread.cpp | 67 ++++++++++++++++++- .../ReplicatedMergeTreeAttachThread.h | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 50 +------------- src/Storages/StorageReplicatedMergeTree.h | 2 - 4 files changed, 67 insertions(+), 54 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 6e22a3515bc..f28b8f9e9a8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes { extern const int SUPPORT_IS_DISABLED; extern const int REPLICA_STATUS_CHANGED; + extern const int LOGICAL_ERROR; } ReplicatedMergeTreeAttachThread::ReplicatedMergeTreeAttachThread(StorageReplicatedMergeTree & storage_) @@ -117,6 +118,66 @@ void ReplicatedMergeTreeAttachThread::checkHasReplicaMetadataInZooKeeper(const z } } +Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper) +{ + const String & zookeeper_path = storage.zookeeper_path; + const String & replica_path = storage.replica_path; + + for (size_t i = 0; i != 2; ++i) + { + String replica_metadata_version_str; + const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version_str); + if (!replica_metadata_version_exists) + return -1; + + const Int32 metadata_version = parse(replica_metadata_version_str); + + if (metadata_version != 0) + { + /// No need to fix anything + return metadata_version; + } + + Coordination::Stat stat; + zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); + if (stat.version == 0) + { + /// No need to fix anything + return metadata_version; + } + + ReplicatedMergeTreeQueue & queue = storage.queue; + queue.pullLogsToQueue(zookeeper); + if (queue.getStatus().metadata_alters_in_queue != 0) + { + LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); + return metadata_version; + } + + const Coordination::Requests ops = { + zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), + zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), + }; + Coordination::Responses ops_responses; + const auto code = zookeeper->tryMulti(ops, ops_responses); + if (code == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Successfully set metadata_version to {}", stat.version); + return stat.version; + } + if (code != Coordination::Error::ZBADVERSION) + { + throw zkutil::KeeperException(code); + } + } + + /// Second attempt is only possible if metadata_version != 0 or metadata.version changed during the first attempt. + /// If metadata_version != 0, on second attempt we will return the new metadata_version. + /// If metadata.version changed, on second attempt we will either get metadata_version != 0 and return the new metadata_version or we will get metadata_alters_in_queue != 0 and return 0. + /// Either way, on second attempt this method should return. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fix replica metadata_version in ZooKeeper after two attempts"); +} + void ReplicatedMergeTreeAttachThread::runImpl() { storage.setZooKeeper(); @@ -160,11 +221,11 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Just in case it was not removed earlier due to connection loss zookeeper->tryRemove(replica_path + "/flags/force_restore_data"); - String replica_metadata_version; - const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version); + const Int32 replica_metadata_version = fixReplicaMetadataVersionIfNeeded(zookeeper); + const bool replica_metadata_version_exists = replica_metadata_version != -1; if (replica_metadata_version_exists) { - storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse(replica_metadata_version))); + storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(replica_metadata_version)); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h index 250a5ed34d1..bfc97442598 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.h @@ -48,6 +48,8 @@ private: void runImpl(); void finalizeInitialization(); + + Int32 fixReplicaMetadataVersionIfNeeded(zkutil::ZooKeeperPtr zookeeper); }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5632c24bca4..865a0cbe506 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6158,8 +6158,6 @@ void StorageReplicatedMergeTree::alter( mutation_znode.reset(); auto current_metadata = getInMemoryMetadataPtr(); - // update metadata's metadata_version - // fixReplicaMetadataVersionIfNeeded(current_metadata->metadata_version); StorageInMemoryMetadata future_metadata = *current_metadata; commands.apply(future_metadata, query_context); @@ -6205,8 +6203,7 @@ void StorageReplicatedMergeTree::alter( size_t mutation_path_idx = std::numeric_limits::max(); String new_metadata_str = future_metadata_in_zk.toString(); - Int32 metadata_version = fixMetadataVersionInZooKeeper(); - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version)); + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion())); String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1)); @@ -10690,49 +10687,4 @@ template std::optional StorageReplicatedMergeTree::all const std::vector & zookeeper_block_id_path, const String & zookeeper_path_prefix) const; -Int32 StorageReplicatedMergeTree::tryFixMetadataVersionInZooKeeper() -{ - const Int32 metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(); - if (metadata_version != 0) - { - /// No need to fix anything - return metadata_version; - } - - auto zookeeper = getZooKeeper(); - - Coordination::Stat stat; - zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat); - if (stat.version == 0) - { - /// No need to fix anything - return metadata_version; - } - - queue.pullLogsToQueue(zookeeper); - if (queue.getStatus().metadata_alters_in_queue != 0) - { - LOG_DEBUG(log, "No need to update metadata_version as there are ALTER_METADATA entries in the queue"); - return metadata_version; - } - - const Coordination::Requests ops = { - zkutil::makeSetRequest(fs::path(replica_path) / "metadata_version", std::to_string(stat.version), 0), - zkutil::makeCheckRequest(fs::path(zookeeper_path) / "metadata", stat.version), - }; - Coordination::Responses ops_responses; - const auto code = current_zookeeper->tryMulti(ops, ops_responses); - if (code == Coordination::Error::ZOK) - { - LOG_DEBUG(log, "Successfully fixed metadata_version"); - return stat.version; - } - if (code == Coordination::Error::ZBADVERSION) - { - LOG_DEBUG(log, "No need to update metadata_version because table metadata has been updated on a different replica"); - return metadata_version; - } - throw zkutil::KeeperException(code); -} - } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 15cfa77302b..c10f66031ef 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -1026,8 +1026,6 @@ private: const bool & zero_copy_enabled, const bool & always_use_copy_instead_of_hardlinks, const ContextPtr & query_context); - - Int32 tryFixMetadataVersionInZooKeeper(); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From 1b1db0081f66068d1bccf3d7963cb872369468f6 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Tue, 10 Sep 2024 22:39:35 +0000 Subject: [PATCH 3/4] do not fix metadata_version if replica is read_only --- src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index f28b8f9e9a8..67570d78366 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -122,6 +122,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: { const String & zookeeper_path = storage.zookeeper_path; const String & replica_path = storage.replica_path; + const bool replica_readonly = storage.is_readonly; for (size_t i = 0; i != 2; ++i) { @@ -132,7 +133,7 @@ Int32 ReplicatedMergeTreeAttachThread::fixReplicaMetadataVersionIfNeeded(zkutil: const Int32 metadata_version = parse(replica_metadata_version_str); - if (metadata_version != 0) + if (metadata_version != 0 || replica_readonly) { /// No need to fix anything return metadata_version; From 721e9a735672fad9f70150434c8ace7a9358fae3 Mon Sep 17 00:00:00 2001 From: Michael Stetsyuk Date: Fri, 13 Sep 2024 12:38:58 +0000 Subject: [PATCH 4/4] empty