From a34e465abf0e17298c8e4bbaa084726dfc91fc0f Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 26 Apr 2023 19:57:18 +0200 Subject: [PATCH 1/2] Fxi --- src/Storages/MergeTree/DataPartsExchange.cpp | 1 - src/Storages/StorageReplicatedMergeTree.cpp | 6 +++--- src/Storages/StorageReplicatedMergeTree.h | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 46c6d09eca4..c6804d260e2 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -938,7 +938,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( if (to_remote_disk) { - data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {}); LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName()); } else diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a2f16da4f4d..c66c4ef4dfe 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1468,7 +1468,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil: } MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction, - const MutableDataPartPtr & part, std::optional hardlinked_files) + const MutableDataPartPtr & part, std::optional hardlinked_files, bool replace_zero_copy_lock) { auto zookeeper = getZooKeeper(); @@ -1477,7 +1477,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd Coordination::Requests ops; NameSet absent_part_paths_on_replicas; - getLockSharedDataOps(*part, std::make_shared(zookeeper), false, hardlinked_files, ops); + getLockSharedDataOps(*part, std::make_shared(zookeeper), replace_zero_copy_lock, hardlinked_files, ops); size_t zero_copy_lock_ops_size = ops.size(); /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. @@ -4162,7 +4162,7 @@ bool StorageReplicatedMergeTree::fetchPart( Transaction transaction(*this, NO_TRANSACTION_RAW); renameTempPartAndReplace(part, transaction); - replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files); + replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files, !part_to_clone); /** If a quorum is tracked for this part, you must update it. * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 894cf6d12ce..64511a4a927 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -549,7 +549,7 @@ private: String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; /// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part - DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}); + DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}, bool replace_zero_copy_lock=false); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; From 70ee02a3eb55a292f0c3150365fba9a3764a3537 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 28 Apr 2023 17:39:32 +0200 Subject: [PATCH 2/2] Add node holder --- src/Common/ZooKeeper/ZooKeeper.h | 2 -- src/Storages/MergeTree/DataPartsExchange.cpp | 5 ++++- src/Storages/StorageReplicatedMergeTree.cpp | 23 ++++++++++---------- src/Storages/StorageReplicatedMergeTree.h | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 2fd94dfbb6b..42ffccc678e 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -524,8 +524,6 @@ public: void setServerCompletelyStarted(); private: - friend class EphemeralNodeHolder; - void init(ZooKeeperArgs args_); /// The following methods don't any throw exceptions but return error codes. diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index c6804d260e2..f0dadcbd89f 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( const auto data_settings = data.getSettings(); MergeTreeData::DataPart::Checksums data_checksums; + zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder; if (to_remote_disk) { readStringBinary(part_id, in); @@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type)); LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName()); - data.lockSharedDataTemporary(part_name, part_id, disk); + zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk); } else { @@ -946,6 +947,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( new_data_part->checksums.checkEqual(data_checksums, false); LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName()); } + if (zero_copy_temporary_lock_holder) + zero_copy_temporary_lock_holder->setAlreadyRemoved(); return new_data_part; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 88fa602f59b..49a5b2ea881 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8113,31 +8113,30 @@ std::optional StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQ } -void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const +zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const { auto settings = getSettings(); if (!disk || !disk->supportZeroCopyReplication() || !settings->allow_remote_fs_zero_copy_replication) - return; + return {}; zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); if (!zookeeper) - return; + return {}; String id = part_id; boost::replace_all(id, "/", "_"); - Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), - part_name, zookeeper_path); + String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), + part_name, zookeeper_path)[0]; - for (const auto & zc_zookeeper_path : zc_zookeeper_paths) - { - String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; + String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; - LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node); - createZeroCopyLockNode( - std::make_shared(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false); - } + LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node); + createZeroCopyLockNode( + std::make_shared(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false); + + return zkutil::EphemeralNodeHolder::existing(zookeeper_node, *zookeeper); } void StorageReplicatedMergeTree::lockSharedData( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 64511a4a927..4e4f383894a 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -259,7 +259,7 @@ public: std::optional hardlinked_files, Coordination::Requests & requests) const; - void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; + zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; /// Unlock shared data part in zookeeper /// Return true if data unlocked