From 9272ed06b427f017d1b95e0d20ff6132f5cb06a2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 9 Oct 2020 17:24:10 +0300 Subject: [PATCH] Move Zookeeper lock for S3 shared part in IMergeTreeDataPart --- src/Storages/MergeTree/DataPartsExchange.cpp | 27 +--------- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 52 +++++++++++++++++++ src/Storages/MergeTree/IMergeTreeDataPart.h | 14 +++++ src/Storages/StorageReplicatedMergeTree.cpp | 28 +--------- 4 files changed, 69 insertions(+), 52 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 2708373d1a4..da5acdbefcd 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -240,17 +239,7 @@ void Service::sendPartS3Metadata(const MergeTreeData::DataPartPtr & part, WriteB if (disk->getType() != "s3") throw Exception("S3 disk is not S3 anymore", ErrorCodes::LOGICAL_ERROR); - String id = disk->getUniqueId(part->getFullRelativePath() + "checksums.txt"); - - if (id.empty()) - throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR); - - String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name; - - LOG_TRACE(log, "Set zookeeper lock {}", id); - - zookeeper->createAncestors(zookeeper_node); - zookeeper->createIfNotExists(zookeeper_node, "lock"); + part->lockSharedData(zookeeper_path, replica_name, zookeeper); writeBinary(checksums.files.size(), out); for (const auto & it : checksums.files) @@ -629,19 +618,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3( new_data_part->modification_time = time(nullptr); new_data_part->loadColumnsChecksumsIndexes(true, false); - - String id = disk->getUniqueId(new_data_part->getFullRelativePath() + "checksums.txt"); - - if (id.empty()) - throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR); - - String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name; - - LOG_TRACE(log, "Set zookeeper lock {}", id); - - zookeeper->createAncestors(zookeeper_node); - zookeeper->createIfNotExists(zookeeper_node, "lock"); - + new_data_part->lockSharedData(zookeeper_path, replica_name, zookeeper); return new_data_part; } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 40a6569cd46..786bc056702 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -1079,6 +1080,56 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada return true; } +void IMergeTreeDataPart::lockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const +{ + auto disk = volume->getDisk(); + + if (disk->getType() != "s3") + return; + + String id = disk->getUniqueId(getFullRelativePath() + "checksums.txt"); + + if (id.empty()) + throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR); + + String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name; + + LOG_TRACE(storage.log, "Set zookeeper lock {}", id); + + zookeeper->createAncestors(zookeeper_node); + zookeeper->createIfNotExists(zookeeper_node, "lock"); +} + +bool IMergeTreeDataPart::unlockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const +{ + auto disk = volume->getDisk(); + + if (disk->getType() != "s3") + return true; + + String id = disk->getUniqueId(getFullRelativePath() + "checksums.txt"); + + if (id.empty()) + return true; + + String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/" + id; + String zookeeper_node = zookeeper_part_node + "/" + replica_name; + + LOG_TRACE(storage.log, "Remove zookeeper lock for {}", id); + + zookeeper->remove(zookeeper_node); + + Strings children; + zookeeper->tryGetChildren(zookeeper_part_node, children); + + if (!children.empty()) + { + LOG_TRACE(storage.log, "Found zookeper locks for {}", id); + } + + return children.empty(); +} + bool isCompactPart(const MergeTreeDataPartPtr & data_part) { return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT); @@ -1095,3 +1146,4 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part) } } + diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 3e7b03b2903..d40ff40f157 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -22,6 +22,12 @@ #include +namespace zkutil +{ + class ZooKeeper; + using ZooKeeperPtr = std::shared_ptr; +} + namespace DB { @@ -349,6 +355,14 @@ public: /// part creation (using alter query with materialize_ttl setting). bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const; + /// Lock part in zookeeper for use common S3 data in several nodes + void lockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const; + + /// Unlock common S3 data part in zookeeper + /// Return true if data unlocked + /// Return false if data is still used by another node + bool unlockSharedData(const String & zookeeper_path, const String & replica_name, zkutil::ZooKeeperPtr zookeeper) const; + protected: /// Total size of all columns, calculated once in calcuateColumnSizesOnDisk diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b1c7c754637..6355894d59e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5111,33 +5111,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() { try { - bool keep_s3 = false; - - auto disk = part->volume->getDisk(); - - if (disk->getType() == "s3") - { - String id = disk->getUniqueId(part->getFullRelativePath() + "checksums.txt"); - - if (!id.empty()) - { - String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/" + id; - String zookeeper_node = zookeeper_part_node + "/" + replica_name; - - LOG_TRACE(log, "Remove zookeeper lock for {}", id); - - zookeeper->remove(zookeeper_node); - - Strings children; - zookeeper->tryGetChildren(zookeeper_part_node, children); - if (!children.empty()) - { - LOG_TRACE(log, "Found zookeper locks for {}", id); - keep_s3 = true; - } - } - } - + bool keep_s3 = !part->unlockSharedData(zookeeper_path, replica_name, zookeeper); part->remove(keep_s3); } catch (...)