diff --git a/dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.cpp b/dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp similarity index 81% rename from dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.cpp rename to dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp index a968a51603a..d0207fde5da 100644 --- a/dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.cpp +++ b/dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include @@ -11,59 +11,59 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -AbandonableLockInZooKeeper::AbandonableLockInZooKeeper( +EphemeralLockInZooKeeper::EphemeralLockInZooKeeper( const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops) : zookeeper(&zookeeper_), path_prefix(path_prefix_) { - String abandonable_path = temp_path + "/abandonable_lock-"; + /// The /abandonable_lock- name is for backward compatibility. + String holder_path_prefix = temp_path + "/abandonable_lock-"; /// Let's create an secondary ephemeral node. if (!precheck_ops || precheck_ops->empty()) { - holder_path = zookeeper->create(abandonable_path, "", zkutil::CreateMode::EphemeralSequential); + holder_path = zookeeper->create(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential); } else { - precheck_ops->emplace_back(zkutil::makeCreateRequest(abandonable_path, "", zkutil::CreateMode::EphemeralSequential)); + precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential)); zkutil::Responses op_results = zookeeper->multi(*precheck_ops); holder_path = dynamic_cast(*op_results.back()).path_created; } /// Write the path to the secondary node in the main node. - path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential); + path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential); if (path.size() <= path_prefix.size()) - throw Exception("Logical error: name of sequential node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR); + throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR); } -void AbandonableLockInZooKeeper::unlock() +void EphemeralLockInZooKeeper::unlock() { - checkCreated(); - zookeeper->remove(path); - zookeeper->remove(holder_path); + zkutil::Requests ops; + getUnlockOps(ops); + zookeeper->multi(ops); holder_path = ""; } -void AbandonableLockInZooKeeper::getUnlockOps(zkutil::Requests & ops) +void EphemeralLockInZooKeeper::getUnlockOps(zkutil::Requests & ops) { checkCreated(); ops.emplace_back(zkutil::makeRemoveRequest(path, -1)); ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1)); } -AbandonableLockInZooKeeper::~AbandonableLockInZooKeeper() +EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper() { - if (!zookeeper || holder_path.empty()) + if (!isCreated()) return; try { - zookeeper->tryRemove(holder_path); - zookeeper->trySet(path, ""); /// It's not strictly necessary. + unlock(); } catch (...) { - tryLogCurrentException("~AbandonableLockInZooKeeper"); + tryLogCurrentException("~EphemeralLockInZooKeeper"); } } diff --git a/dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.h b/dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.h similarity index 67% rename from dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.h rename to dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.h index 87683e84596..bda4edf70ec 100644 --- a/dbms/src/Storages/MergeTree/AbandonableLockInZooKeeper.h +++ b/dbms/src/Storages/MergeTree/EphemeralLockInZooKeeper.h @@ -13,33 +13,24 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -/** The synchronization is primitive. Works as follows: - * Creates a non-ephemeral incremental node and marks it as locked (LOCKED). - * `unlock()` unlocks it (UNLOCKED). - * When the destructor is called or the session ends in ZooKeeper, it goes into the ABANDONED state. - * (Including when the program is halted). - */ -class AbandonableLockInZooKeeper : public boost::noncopyable +/// A class that is used for locking a block number in a partition. +/// It creates a secondary ephemeral node in `temp_path` and a main ephemeral node with `path_prefix` +/// that references the secondary node. The reasons for this two-level scheme are historical (of course +/// it would be simpler to allocate block numbers for all partitions in one ZK directory). +class EphemeralLockInZooKeeper : public boost::noncopyable { public: - enum State - { - UNLOCKED, - LOCKED, - ABANDONED, - }; - - AbandonableLockInZooKeeper( + EphemeralLockInZooKeeper( const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops = nullptr); - AbandonableLockInZooKeeper() = default; + EphemeralLockInZooKeeper() = default; - AbandonableLockInZooKeeper(AbandonableLockInZooKeeper && rhs) noexcept + EphemeralLockInZooKeeper(EphemeralLockInZooKeeper && rhs) noexcept { *this = std::move(rhs); } - AbandonableLockInZooKeeper & operator=(AbandonableLockInZooKeeper && rhs) noexcept + EphemeralLockInZooKeeper & operator=(EphemeralLockInZooKeeper && rhs) noexcept { zookeeper = rhs.zookeeper; rhs.zookeeper = nullptr; @@ -82,10 +73,10 @@ public: void checkCreated() const { if (!isCreated()) - throw Exception("AbandonableLock is not created", ErrorCodes::LOGICAL_ERROR); + throw Exception("EphemeralLock is not created", ErrorCodes::LOGICAL_ERROR); } - ~AbandonableLockInZooKeeper(); + ~EphemeralLockInZooKeeper(); private: zkutil::ZooKeeper * zookeeper = nullptr; @@ -95,8 +86,7 @@ private: }; -/// Acquires block number locks in all partitions. The class is called Ephemeral- instead of Abandonable- -/// because it creates ephemeral block nodes (there is no need to leave abandoned tombstones). +/// Acquires block number locks in all partitions. class EphemeralLocksInAllPartitions { public: diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 643b6370590..870d6901835 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -935,12 +935,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart * When M > N parts could be replaced? * - new block was added in ReplicatedMergeTreeBlockOutputStream; * - it was added to working dataset in memory and renamed on filesystem; - * - but ZooKeeper transaction that add its to reference dataset in ZK and unlocks AbandonableLock is failed; + * - but ZooKeeper transaction that adds it to reference dataset in ZK failed; * - and it is failed due to connection loss, so we don't rollback working dataset in memory, * because we don't know if the part was added to ZK or not * (see ReplicatedMergeTreeBlockOutputStream) - * - then method selectPartsToMerge selects a range and see, that AbandonableLock for this part is abandoned, - * and so, it is possible to merge a range skipping this part. + * - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked, + * and so it is possible to merge a range skipping this part. * (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.) * - and after merge, this part will be removed in addition to parts that was merged. */ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 07355529d7b..d47a06432ff 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index fb73aa64bef..8b014e4373d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1267,14 +1267,14 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( } /// Load current inserts - std::unordered_set abandonable_lock_holders; + std::unordered_set lock_holder_paths; for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp")) { if (startsWith(entry, "abandonable_lock-")) - abandonable_lock_holders.insert(queue.zookeeper_path + "/temp/" + entry); + lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry); } - if (!abandonable_lock_holders.empty()) + if (!lock_holder_paths.empty()) { Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers"); std::vector> lock_futures; @@ -1310,7 +1310,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( for (BlockInfo & block : block_infos) { zkutil::GetResponse resp = block.contents_future.get(); - if (!resp.error && abandonable_lock_holders.count(resp.data)) + if (!resp.error && lock_holder_paths.count(resp.data)) committing_blocks[block.partition].insert(block.number); } } @@ -1338,7 +1338,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()( /// A sketch of a proof of why this method actually works: /// /// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right. - /// Inserted parts get their block numbers by acquiring an abandonable lock (see AbandonableLockInZooKeeper.h). + /// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h). /// These block numbers are monotonically increasing in a partition. /// /// Because there is a window between the moment the inserted part gets its block number and diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 87cda09f566..6ae2104a57a 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2323,16 +2323,6 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - const String & partition_id = parts[0]->info.partition_id; - for (size_t i = 0; i + 1 < parts.size(); ++i) - { - /// Remove the unnecessary entries about non-existent blocks. - for (Int64 number = parts[i]->info.max_block + 1; number <= parts[i + 1]->info.min_block - 1; ++number) - { - zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number)); - } - } - if (out_log_entry) *out_log_entry = entry; @@ -3414,7 +3404,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path) } -std::optional +std::optional StorageReplicatedMergeTree::allocateBlockNumber( const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path) { @@ -3444,11 +3434,11 @@ StorageReplicatedMergeTree::allocateBlockNumber( zkutil::KeeperMultiException::check(code, ops, responses); } - AbandonableLockInZooKeeper lock; + EphemeralLockInZooKeeper lock; /// 2 RTT try { - lock = AbandonableLockInZooKeeper( + lock = EphemeralLockInZooKeeper( partition_path + "/block-", zookeeper_path + "/temp", *zookeeper, &deduplication_check_ops); } catch (const zkutil::KeeperMultiException & e) @@ -4385,7 +4375,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ MergeTreeData::MutableDataPartsVector dst_parts; Strings block_id_paths; Strings part_checksums; - std::vector abandonable_locks; + std::vector ephemeral_locks; LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts"); @@ -4441,7 +4431,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ src_parts.emplace_back(src_part); dst_parts.emplace_back(dst_part); - abandonable_locks.emplace_back(std::move(*lock)); + ephemeral_locks.emplace_back(std::move(*lock)); block_id_paths.emplace_back(block_id_path); part_checksums.emplace_back(hash_hex); } @@ -4482,7 +4472,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ for (size_t i = 0; i < dst_parts.size(); ++i) { getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); - abandonable_locks[i].getUnlockOps(ops); + ephemeral_locks[i].getUnlockOps(ops); if (ops.size() > zkutil::MULTI_BATCH_SIZE) { @@ -4523,7 +4513,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_ String log_znode_path = dynamic_cast(*op_results.back()).path_created; entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - for (auto & lock : abandonable_locks) + for (auto & lock : ephemeral_locks) lock.assumeUnlocked(); /// Forcibly remove replaced parts from ZooKeeper diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index eac3e1d9c79..e512977d4b0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include #include @@ -460,8 +460,9 @@ private: void updateQuorum(const String & part_name); /// Creates new block number if block with such block_id does not exist - std::optional allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, - const String & zookeeper_block_id_path = ""); + std::optional allocateBlockNumber( + const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, + const String & zookeeper_block_id_path = ""); /** Wait until all replicas, including this, execute the specified action from the log. * If replicas are added at the same time, it can not wait the added replica . diff --git a/dbms/src/Storages/tests/get_abandonable_lock_in_all_partitions.cpp b/dbms/src/Storages/tests/get_abandonable_lock_in_all_partitions.cpp index 0d2bf99ec88..f0c5a3d158e 100644 --- a/dbms/src/Storages/tests/get_abandonable_lock_in_all_partitions.cpp +++ b/dbms/src/Storages/tests/get_abandonable_lock_in_all_partitions.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/Storages/tests/get_current_inserts_in_replicated.cpp b/dbms/src/Storages/tests/get_current_inserts_in_replicated.cpp index a49c99b22ce..3b7e85d58dc 100644 --- a/dbms/src/Storages/tests/get_current_inserts_in_replicated.cpp +++ b/dbms/src/Storages/tests/get_current_inserts_in_replicated.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include @@ -37,17 +37,17 @@ try Stopwatch total; Stopwatch stage; /// Load current inserts - std::unordered_set abandonable_lock_holders; + std::unordered_set lock_holder_paths; for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp")) { if (startsWith(entry, "abandonable_lock-")) - abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry); + lock_holder_paths.insert(zookeeper_path + "/temp/" + entry); } - std::cerr << "Stage 1 (get lock holders): " << abandonable_lock_holders.size() + std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size() << " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl; stage.restart(); - if (!abandonable_lock_holders.empty()) + if (!lock_holder_paths.empty()) { Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers"); std::cerr << "Stage 2 (get partitions): " << partitions.size() @@ -86,7 +86,7 @@ try for (BlockInfo & block : block_infos) { zkutil::GetResponse resp = block.contents_future.get(); - if (!resp.error && abandonable_lock_holders.count(resp.data)) + if (!resp.error && lock_holder_paths.count(resp.data)) { ++total_count; current_inserts[block.partition].insert(block.number);