From 80ab73c6915b7ddf340c0ea921e6c3fa34167ee9 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 1 Dec 2021 16:11:26 +0300 Subject: [PATCH] Fix Zero-Copy replication lost locks, fix remove used remote data in DROP DETACHED PART --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 8 ++ src/Storages/MergeTree/IMergeTreeDataPart.h | 3 + src/Storages/MergeTree/MergeTreeData.cpp | 110 +++++++++++------- src/Storages/MergeTree/MergeTreeData.h | 10 ++ .../MergeTree/ReplicatedMergeTreeSink.cpp | 4 + src/Storages/StorageReplicatedMergeTree.cpp | 106 ++++++++--------- src/Storages/StorageReplicatedMergeTree.h | 8 +- 7 files changed, 155 insertions(+), 94 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 81920735829..8ffcab8adc0 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1100,6 +1100,14 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_ storage.lockSharedData(*this); } +void IMergeTreeDataPart::cleanupOldName(const String & old_name) const +{ + if (name == old_name) + return; + + storage.unlockSharedData(*this, old_name); +} + std::optional IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const { /// NOTE: It's needed for zero-copy replication diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 12e1cc9738b..ceac37c19d1 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -334,6 +334,9 @@ public: /// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const; + /// Cleanup after change part + virtual void cleanupOldName(const String & old_part_name) const; + /// Makes clone of a part in detached/ directory via hard links virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a7309577fa6..499ee09ffee 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2449,6 +2449,8 @@ bool MergeTreeData::renameTempPartAndReplace( MergeTreePartInfo part_info = part->info; String part_name; + String old_part_name = part->name; + if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock)) { if (part->partition.value != existing_part_in_partition->partition.value) @@ -2512,6 +2514,7 @@ bool MergeTreeData::renameTempPartAndReplace( /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts /// /// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction. + part->name = part_name; part->info = part_info; part->is_temp = false; @@ -2560,6 +2563,8 @@ bool MergeTreeData::renameTempPartAndReplace( out_covered_parts->emplace_back(std::move(covered_part)); } + part->cleanupOldName(old_part_name); + return true; } @@ -3885,11 +3890,16 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr renamed_parts.tryRenameAll(); + String replica_name = getReplicaName(); + String zookeeper_name = getZooKeeperName(); + String zookeeper_path = getZooKeeperPath(); + for (auto & [old_name, new_name] : renamed_parts.old_and_new_names) { const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name]; - disk->removeRecursive(fs::path(path) / "detached" / new_name / ""); - LOG_DEBUG(log, "Dropped detached part {}", old_name); + bool keep_shared = removeSharedDetachedPart(disk, fs::path(path) / "detached" / new_name / "", old_name, + zookeeper_name, replica_name, zookeeper_path); + LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared); old_name.clear(); } } @@ -5288,6 +5298,63 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll( return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context); } +bool MergeTreeData::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name) +{ + bool keep_shared = false; + + if (disk->supportZeroCopyReplication()) + { + FreezeMetaData meta; + if (meta.load(disk, path) && meta.is_replicated) + return removeSharedDetachedPart(disk, path, part_name, meta.zookeeper_name, meta.replica_name, ""); + } + + disk->removeSharedRecursive(path, keep_shared); + + return keep_shared; +} + +bool MergeTreeData::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, + const String & zookeeper_name, const String & replica_name, const String & zookeeper_path) +{ + bool keep_shared = false; + + if (disk->supportZeroCopyReplication()) + { + zkutil::ZooKeeperPtr zookeeper; + if (zookeeper_name == "default") + { + zookeeper = getContext()->getZooKeeper(); + } + else + { + zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name); + } + + if (zookeeper) + { + fs::path checksums = fs::path(path) / "checksums.txt"; + if (disk->exists(checksums)) + { + auto ref_count = disk->getRefCount(checksums); + if (ref_count == 0) + { + String id = disk->getUniqueId(checksums); + keep_shared = !StorageReplicatedMergeTree::unlockSharedDataById(id, part_name, + replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log, + zookeeper_path); + } + else + keep_shared = true; + } + } + } + + disk->removeSharedRecursive(path, keep_shared); + + return keep_shared; +} + PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr) { auto backup_path = fs::path("shadow") / escapeForFileName(backup_name) / relative_data_path; @@ -5316,42 +5383,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn const auto & path = it->path(); - bool keep_shared = false; - - if (disk->supportZeroCopyReplication()) - { - FreezeMetaData meta; - if (meta.load(disk, path) && meta.is_replicated) - { - zkutil::ZooKeeperPtr zookeeper; - if (meta.zookeeper_name == "default") - { - zookeeper = getContext()->getZooKeeper(); - } - else - { - zookeeper = getContext()->getAuxiliaryZooKeeper(meta.zookeeper_name); - } - - if (zookeeper) - { - fs::path checksums = fs::path(path) / "checksums.txt"; - if (disk->exists(checksums)) - { - auto ref_count = disk->getRefCount(checksums); - if (ref_count == 0) - { - String id = disk->getUniqueId(checksums); - keep_shared = !StorageReplicatedMergeTree::unlockSharedDataById(id, partition_directory, - meta.replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log, - nullptr); - } - } - } - } - } - - disk->removeSharedRecursive(path, keep_shared); + bool keep_shared = removeSharedDetachedPart(disk, path, partition_directory); result.push_back(PartitionCommandResultInfo{ .partition_id = partition_id, @@ -5361,7 +5393,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn .backup_name = backup_name, }); - LOG_DEBUG(log, "Unfreezed part by path {}, keep shared data {}", disk->getPath() + path, keep_shared); + LOG_DEBUG(log, "Unfreezed part by path {}, keep shared data: {}", disk->getPath() + path, keep_shared); } } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index ed9a7f058dd..6a6f616484d 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -867,11 +867,15 @@ public: /// Overridden in StorageReplicatedMergeTree virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; } + /// Unlock same part with other (old) name + virtual bool unlockSharedData(const IMergeTreeDataPart &, const String &) const { return true; } + /// Fetch part only if some replica has it on shared storage like S3 /// Overridden in StorageReplicatedMergeTree virtual bool tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return false; } virtual String getZooKeeperName() const { return ""; } + virtual String getZooKeeperPath() const { return ""; } /// Parts that currently submerging (merging to bigger parts) or emerging /// (to be appeared after merging finished). These two variables have to be used @@ -1174,6 +1178,12 @@ private: DataPartsVector & duplicate_parts_to_remove, MutableDataPartsVector & parts_from_wal, DataPartsLock & part_lock); + + /// Check shared data usage on other replicas for detached/freezed part + /// Remove local files and remote files if needed + bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name); + bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, + const String & zookeeper_name, const String & replica_name, const String & zookeeper_path); }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index e3ca902b1bd..5027b861e18 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -222,6 +222,8 @@ void ReplicatedMergeTreeSink::commitPart( bool is_already_existing_part = false; + String old_part_name = part->name; + while (true) { /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. @@ -502,6 +504,8 @@ void ReplicatedMergeTreeSink::commitPart( waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value); } + + part->cleanupOldName(old_part_name); } void ReplicatedMergeTreeSink::onStart() diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d989c4b2030..572b5d7b4b9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4111,13 +4111,8 @@ void StorageReplicatedMergeTree::startup() getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr); convertZeroCopySchema(); - is_zero_copy_in_compatible_mode = isZeroCopySchemaInCompatibleMode(); - - if (!is_zero_copy_in_compatible_mode) - { /// All replicas have new version - cleanupOldZeroCopySchema(); - } + cleanupOldZeroCopySchema(); /// In this thread replica will be activated. restarting_thread.start(); @@ -7144,9 +7139,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part) String id = part.getUniqueId(); boost::replace_all(id, "/", "_"); - const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr; - - Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), zookeeper_path_ptr); + Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), is_zero_copy_in_compatible_mode ? zookeeper_path : ""); for (const auto & zc_zookeeper_path : zc_zookeeper_paths) { String zookeeper_node = fs::path(zc_zookeeper_path) / zero_copy / "shared" / part.name / id / replica_name; @@ -7158,6 +7151,12 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part) bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const +{ + return unlockSharedData(part, part.name); +} + + +bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const { if (!part.volume) return true; @@ -7170,25 +7169,23 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par return true; auto ref_count = part.getRefCount(); - LOG_TRACE(log, "RefCount {} for part {}", ref_count, part.name); if (ref_count > 0) /// Keep part shard info for frozen backups return false; - const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr; - - return unlockSharedDataById(part.getUniqueId(), part.name, replica_name, disk, zookeeper, *getDefaultSettings(), log, zookeeper_path_ptr); + return unlockSharedDataById(part.getUniqueId(), name, replica_name, disk, zookeeper, *getDefaultSettings(), log, + is_zero_copy_in_compatible_mode ? zookeeper_path : String("")); } bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String & part_name, - const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, - Poco::Logger * logger, const String * zookeeper_path_ptr) + const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings, + Poco::Logger * logger, const String & zookeeper_path_old) { boost::replace_all(id, "/", "_"); String zero_copy = fmt::format("zero_copy_{}", toString(disk->getType())); - Strings zc_zookeeper_paths = getZeroCopyRootPath(settings, zookeeper_path_ptr); + Strings zc_zookeeper_paths = getZeroCopyRootPath(settings, zookeeper_path_old); bool res = true; @@ -7200,10 +7197,10 @@ bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String & LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node); - zookeeper_->tryRemove(zookeeper_node); + zookeeper_ptr->tryRemove(zookeeper_node); Strings children; - zookeeper_->tryGetChildren(zookeeper_part_uniq_node, children); + zookeeper_ptr->tryGetChildren(zookeeper_part_uniq_node, children); if (!children.empty()) { @@ -7212,14 +7209,16 @@ bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String & continue; } - zookeeper_->tryRemove(zookeeper_part_uniq_node); + zookeeper_ptr->tryRemove(zookeeper_part_uniq_node); /// Even when we have lock with same part name, but with different uniq, we can remove files on S3 children.clear(); - zookeeper_->tryGetChildren(zookeeper_part_node, children); + zookeeper_ptr->tryGetChildren(zookeeper_part_node, children); if (children.empty()) + { /// Cleanup after last uniq removing - zookeeper_->tryRemove(zookeeper_part_node); + zookeeper_ptr->tryRemove(zookeeper_part_node); + } } return res; @@ -7257,9 +7256,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica( String zero_copy = fmt::format("zero_copy_{}", toString(disk_type)); - const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr; - - Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), zookeeper_path_ptr); + Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), is_zero_copy_in_compatible_mode ? zookeeper_path : ""); std::set replicas; @@ -7331,30 +7328,30 @@ String StorageReplicatedMergeTree::getSharedDataReplica( } -Strings StorageReplicatedMergeTree::getZeroCopyRootPath(const MergeTreeSettings & settings, const String * zookeeper_path_ptr) +Strings StorageReplicatedMergeTree::getZeroCopyRootPath(const MergeTreeSettings & settings, const String & zookeeper_path_old) { Strings res; res.push_back(settings.remote_fs_zero_copy_zookeeper_path); - if (zookeeper_path_ptr) /// Compatibility mode for cluster with old and new versions - res.push_back(*zookeeper_path_ptr); + if (!zookeeper_path_old.empty()) /// Compatibility mode for cluster with old and new versions + res.push_back(zookeeper_path_old); return res; } String StorageReplicatedMergeTree::findReplicaHavingPart( - const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_) + const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_ptr) { - Strings replicas = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas"); + Strings replicas = zookeeper_ptr->getChildren(fs::path(zookeeper_path_) / "replicas"); /// Select replicas in uniformly random order. std::shuffle(replicas.begin(), replicas.end(), thread_local_rng); for (const String & replica : replicas) { - if (zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "parts" / part_name) - && zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active")) + if (zookeeper_ptr->exists(fs::path(zookeeper_path_) / "replicas" / replica / "parts" / part_name) + && zookeeper_ptr->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active")) return fs::path(zookeeper_path_) / "replicas" / replica; } @@ -7594,6 +7591,8 @@ void StorageReplicatedMergeTree::convertZeroCopySchema() LOG_INFO(log, "Convert zero_copy version from {} to {} for {}", zero_copy_version, required_zero_copy_version, version_path.string()); + unsigned long converted_part_counter = 0; + for (auto const & disk_type : disk_types) { String zero_copy = fmt::format("zero_copy_{}", disk_type); @@ -7605,18 +7604,26 @@ void StorageReplicatedMergeTree::convertZeroCopySchema() auto old_shard_root = revert_to_version == 1 ? shard_root_v2 : shard_root_v1; auto new_shard_root = revert_to_version == 1 ? shard_root_v1 : shard_root_v2; - Coordination::Stat stat; - Strings parts = current_zookeeper->getChildren(old_shard_root, &stat); + Strings parts; + current_zookeeper->tryGetChildren(old_shard_root, parts); - for (const auto & part : parts) + for (const auto & part_name : parts) { - Strings ids = current_zookeeper->getChildren(old_shard_root / part, &stat); - for (const auto & id : ids) - { - if (current_zookeeper->exists(old_shard_root / part / id / replica_name)) + auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); + auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed}); + + if (part) + { /// Do not move lost locks + Strings ids; + current_zookeeper->tryGetChildren(old_shard_root / part_name, ids); + for (const auto & id : ids) { - auto zookeeper_node = new_shard_root / part / id / replica_name; - createZeroCopyLockNode(current_zookeeper, zookeeper_node.string()); + if (current_zookeeper->exists(old_shard_root / part_name / id / replica_name)) + { + auto zookeeper_node = new_shard_root / part_name / id / replica_name; + createZeroCopyLockNode(current_zookeeper, zookeeper_node.string()); + ++converted_part_counter; + } } } } @@ -7630,13 +7637,16 @@ void StorageReplicatedMergeTree::convertZeroCopySchema() current_zookeeper->createOrUpdate(version_path / "cleanup_required", std::to_string(zero_copy_version), zkutil::CreateMode::Persistent); - LOG_INFO(log, "Convert zero_copy version from {} to {} for {} complete", zero_copy_version, required_zero_copy_version, - version_path.string()); + LOG_INFO(log, "Convert zero_copy version from {} to {} for {} complete, converted {} locks", zero_copy_version, required_zero_copy_version, + version_path.string(), converted_part_counter); } void StorageReplicatedMergeTree::cleanupOldZeroCopySchema() { + if (is_zero_copy_in_compatible_mode) + return; /// Some replicas have old version + if (!current_zookeeper) return; @@ -7672,17 +7682,7 @@ void StorageReplicatedMergeTree::cleanupOldZeroCopySchema() auto old_shard_root = fs::path(zookeeper_path) / zero_copy / "shared"; - Coordination::Stat stat; - Strings parts = current_zookeeper->getChildren(old_shard_root, &stat); - - for (const auto & part : parts) - { - Strings ids = current_zookeeper->getChildren(old_shard_root / part, &stat); - for (const auto & id : ids) - { - current_zookeeper->remove(old_shard_root / part / id / replica_name); - } - } + current_zookeeper->tryRemoveRecursive(old_shard_root); } current_zookeeper->remove(old_version_path); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 7fd1febc9a6..b6b6076206e 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -237,12 +237,15 @@ public: /// Return false if data is still used by another node bool unlockSharedData(const IMergeTreeDataPart & part) const override; + /// Unlock same part with other (old) name + bool unlockSharedData(const IMergeTreeDataPart & part, const String & name) const override; + /// Unlock shared data part in zookeeper by part id /// Return true if data unlocked /// Return false if data is still used by another node static bool unlockSharedDataById(String id, const String & part_name, const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger, - const String * zookeeper_path_ptr); + const String & zookeeper_path_old); /// Fetch part only if some replica has it on shared storage like S3 bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override; @@ -272,6 +275,7 @@ public: bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name); virtual String getZooKeeperName() const override { return zookeeper_name; } + virtual String getZooKeeperPath() const override { return zookeeper_path; } private: std::atomic_bool are_restoring_replica {false}; @@ -737,7 +741,7 @@ private: PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions( const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const; - static Strings getZeroCopyRootPath(const MergeTreeSettings & settings, const String * zookeeper_path_ptr = nullptr); + static Strings getZeroCopyRootPath(const MergeTreeSettings & settings, const String & zookeeper_path_old); /// Upgrave zero-copy version /// version 1 - lock for shared part inside table node in ZooKeeper