diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index a8da0dff0cc..d1b7dea74d9 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path) } } +void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests) +{ + std::vector paths_to_check; + size_t pos = 1; + while (true) + { + pos = path.find('/', pos); + if (pos == std::string::npos) + break; + paths_to_check.emplace_back(path.substr(0, pos)); + ++pos; + } + + MultiExistsResponse response = exists(paths_to_check); + + for (size_t i = 0; i < paths_to_check.size(); ++i) + { + if (response[i].error != Coordination::Error::ZOK) + { + /// Ephemeral nodes cannot have children + requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent)); + } + } +} + Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version) { auto future_result = asyncTryRemoveNoThrow(path, version); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 8e7639b8cc1..2fd94dfbb6b 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -237,6 +237,8 @@ public: /// Does not create the node itself. void createAncestors(const std::string & path); + void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests); + /// Remove the node if the version matches. (if version == -1, remove any version). void remove(const std::string & path, int32_t version = -1); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9b4972ade59..ffcbe801581 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1477,7 +1477,8 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd Coordination::Requests ops; NameSet absent_part_paths_on_replicas; - lockSharedData(*part, false, hardlinked_files); + getLockSharedDataOps(*part, zookeeper, false, hardlinked_files, ops); + size_t zero_copy_lock_ops_size = ops.size(); /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas); @@ -1509,7 +1510,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd if (e == Coordination::Error::ZNODEEXISTS) { - size_t num_check_ops = 2 * absent_part_paths_on_replicas.size(); + size_t num_check_ops = 2 * absent_part_paths_on_replicas.size() + zero_copy_lock_ops_size; size_t failed_op_index = zkutil::getFailedOpIndex(e, responses); if (failed_op_index < num_check_ops) { @@ -8150,6 +8151,52 @@ void StorageReplicatedMergeTree::lockSharedData( return lockSharedData(part, std::make_shared(nullptr), replace_existing_lock, hardlinked_files); } +void StorageReplicatedMergeTree::getLockSharedDataOps( + const IMergeTreeDataPart & part, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + bool replace_existing_lock, + std::optional hardlinked_files, + Coordination::Requests & requests) const +{ + if (!part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication) + return; + + if (!part.getDataPartStorage().supportZeroCopyReplication()) + return; + + if (zookeeper->isNull()) + return; + + String id = part.getUniqueId(); + boost::replace_all(id, "/", "_"); + + Strings zc_zookeeper_paths = getZeroCopyPartPath( + *getSettings(), part.getDataPartStorage().getDiskType(), getTableSharedID(), + part.name, zookeeper_path); + + String path_to_set_hardlinked_files; + NameSet hardlinks; + + if (hardlinked_files.has_value() && !hardlinked_files->hardlinks_from_source_part.empty()) + { + path_to_set_hardlinked_files = getZeroCopyPartPath( + *getSettings(), part.getDataPartStorage().getDiskType(), hardlinked_files->source_table_shared_id, + hardlinked_files->source_part_name, zookeeper_path)[0]; + + hardlinks = hardlinked_files->hardlinks_from_source_part; + } + + for (const auto & zc_zookeeper_path : zc_zookeeper_paths) + { + String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; + + getZeroCopyLockNodeCreaetOps( + zookeeper, zookeeper_node, requests, zkutil::CreateMode::Persistent, + replace_existing_lock, path_to_set_hardlinked_files, hardlinks); + } +} + + void StorageReplicatedMergeTree::lockSharedData( const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper, @@ -8190,11 +8237,13 @@ void StorageReplicatedMergeTree::lockSharedData( { String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; - LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node); + LOG_TRACE(log, "Trying to create zookeeper persistent lock {}", zookeeper_node); createZeroCopyLockNode( zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, replace_existing_lock, path_to_set_hardlinked_files, hardlinks); + + LOG_TRACE(log, "Zookeeper persistent lock {} created", zookeeper_node); } } @@ -8355,6 +8404,7 @@ std::pair getParentLockedBlobs(const ZooKeeperWithFaultInjectionP return {true, files_not_to_remove}; } } + LOG_TRACE(log, "No mutation parent found for part {}", part_info_str); return {false, files_not_to_remove}; } @@ -8406,6 +8456,10 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name); return {false, {}}; } + else + { + LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, but we don't have mutation parent, can remove blobs", zookeeper_part_replica_node, part_name); + } } else { @@ -8927,6 +8981,46 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP return true; } +void StorageReplicatedMergeTree::getZeroCopyLockNodeCreaetOps( + const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests, + int32_t mode, bool replace_existing_lock, + const String & path_to_set_hardlinked_files, const NameSet & hardlinked_files) +{ + + /// Ephemeral locks can be created only when we fetch shared data. + /// So it never require to create ancestors. If we create them + /// race condition with source replica drop is possible. + if (mode == zkutil::CreateMode::Persistent) + zookeeper->checkExistsAndGetCreateAncestorsOps(zookeeper_node, requests); + + if (replace_existing_lock && zookeeper->exists(zookeeper_node)) + { + requests.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1)); + requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); + if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) + { + std::string data = boost::algorithm::join(hardlinked_files, "\n"); + /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- + /// is a path to source part zero copy node. During part removal hardlinked + /// files will be left for source part. + requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); + } + } + else + { + Coordination::Requests ops; + if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) + { + std::string data = boost::algorithm::join(hardlinked_files, "\n"); + /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- + /// is a path to source part zero copy node. During part removal hardlinked + /// files will be left for source part. + requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); + } + requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); + } +} + void StorageReplicatedMergeTree::createZeroCopyLockNode( const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode, @@ -8940,64 +9034,18 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode( { try { - /// Ephemeral locks can be created only when we fetch shared data. - /// So it never require to create ancestors. If we create them - /// race condition with source replica drop is possible. - if (mode == zkutil::CreateMode::Persistent) - zookeeper->createAncestors(zookeeper_node); - - if (replace_existing_lock && zookeeper->exists(zookeeper_node)) + Coordination::Requests ops; + getZeroCopyLockNodeCreaetOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files); + auto error = zookeeper->tryMulti(ops, responses); + if (error == Coordination::Error::ZOK) { - Coordination::Requests ops; - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); - if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) - { - std::string data = boost::algorithm::join(hardlinked_files, "\n"); - /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- - /// is a path to source part zero copy node. During part removal hardlinked - /// files will be left for source part. - ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); - } - Coordination::Responses responses; - auto error = zookeeper->tryMulti(ops, responses); - if (error == Coordination::Error::ZOK) - { - created = true; - break; - } - else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent) - { - throw Exception(ErrorCodes::NOT_FOUND_NODE, - "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node); - } + created = true; + break; } - else + else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent) { - Coordination::Requests ops; - if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) - { - std::string data = boost::algorithm::join(hardlinked_files, "\n"); - /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- - /// is a path to source part zero copy node. During part removal hardlinked - /// files will be left for source part. - ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); - } - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); - - Coordination::Responses responses; - auto error = zookeeper->tryMulti(ops, responses); - if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS) - { - created = true; - break; - } - else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent) - { - /// Ephemeral locks used during fetches so if parent node was removed we cannot do anything - throw Exception(ErrorCodes::NOT_FOUND_NODE, - "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node); - } + throw Exception(ErrorCodes::NOT_FOUND_NODE, + "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node); } } catch (const zkutil::KeeperException & e) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index ade4e4f0b4b..894cf6d12ce 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -252,6 +252,13 @@ public: bool replace_existing_lock, std::optional hardlinked_files) const; + void getLockSharedDataOps( + const IMergeTreeDataPart & part, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + bool replace_existing_lock, + std::optional hardlinked_files, + Coordination::Requests & requests) const; + void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; /// Unlock shared data part in zookeeper @@ -861,6 +868,12 @@ private: int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); + static void getZeroCopyLockNodeCreaetOps( + const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests, + int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, + const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); + + bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override; /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.