switch to ephemeral nodes for block number locks [#CLICKHOUSE-3802]

This commit is contained in:
Alexey Zatelepin 2018-07-04 19:31:21 +03:00 committed by alexey-milovidov
parent 0a46d231ed
commit a41ee1f0a7
9 changed files with 56 additions and 75 deletions

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <common/logger_useful.h>
@ -11,59 +11,59 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
AbandonableLockInZooKeeper::AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops)
: zookeeper(&zookeeper_), path_prefix(path_prefix_)
{
String abandonable_path = temp_path + "/abandonable_lock-";
/// The /abandonable_lock- name is for backward compatibility.
String holder_path_prefix = temp_path + "/abandonable_lock-";
/// Let's create an secondary ephemeral node.
if (!precheck_ops || precheck_ops->empty())
{
holder_path = zookeeper->create(abandonable_path, "", zkutil::CreateMode::EphemeralSequential);
holder_path = zookeeper->create(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential);
}
else
{
precheck_ops->emplace_back(zkutil::makeCreateRequest(abandonable_path, "", zkutil::CreateMode::EphemeralSequential));
precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential));
zkutil::Responses op_results = zookeeper->multi(*precheck_ops);
holder_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
}
/// Write the path to the secondary node in the main node.
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential);
if (path.size() <= path_prefix.size())
throw Exception("Logical error: name of sequential node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
}
void AbandonableLockInZooKeeper::unlock()
void EphemeralLockInZooKeeper::unlock()
{
checkCreated();
zookeeper->remove(path);
zookeeper->remove(holder_path);
zkutil::Requests ops;
getUnlockOps(ops);
zookeeper->multi(ops);
holder_path = "";
}
void AbandonableLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
void EphemeralLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
{
checkCreated();
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1));
}
AbandonableLockInZooKeeper::~AbandonableLockInZooKeeper()
EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
{
if (!zookeeper || holder_path.empty())
if (!isCreated())
return;
try
{
zookeeper->tryRemove(holder_path);
zookeeper->trySet(path, ""); /// It's not strictly necessary.
unlock();
}
catch (...)
{
tryLogCurrentException("~AbandonableLockInZooKeeper");
tryLogCurrentException("~EphemeralLockInZooKeeper");
}
}

View File

@ -13,33 +13,24 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/** The synchronization is primitive. Works as follows:
* Creates a non-ephemeral incremental node and marks it as locked (LOCKED).
* `unlock()` unlocks it (UNLOCKED).
* When the destructor is called or the session ends in ZooKeeper, it goes into the ABANDONED state.
* (Including when the program is halted).
*/
class AbandonableLockInZooKeeper : public boost::noncopyable
/// A class that is used for locking a block number in a partition.
/// It creates a secondary ephemeral node in `temp_path` and a main ephemeral node with `path_prefix`
/// that references the secondary node. The reasons for this two-level scheme are historical (of course
/// it would be simpler to allocate block numbers for all partitions in one ZK directory).
class EphemeralLockInZooKeeper : public boost::noncopyable
{
public:
enum State
{
UNLOCKED,
LOCKED,
ABANDONED,
};
AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops = nullptr);
AbandonableLockInZooKeeper() = default;
EphemeralLockInZooKeeper() = default;
AbandonableLockInZooKeeper(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper(EphemeralLockInZooKeeper && rhs) noexcept
{
*this = std::move(rhs);
}
AbandonableLockInZooKeeper & operator=(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper & operator=(EphemeralLockInZooKeeper && rhs) noexcept
{
zookeeper = rhs.zookeeper;
rhs.zookeeper = nullptr;
@ -82,10 +73,10 @@ public:
void checkCreated() const
{
if (!isCreated())
throw Exception("AbandonableLock is not created", ErrorCodes::LOGICAL_ERROR);
throw Exception("EphemeralLock is not created", ErrorCodes::LOGICAL_ERROR);
}
~AbandonableLockInZooKeeper();
~EphemeralLockInZooKeeper();
private:
zkutil::ZooKeeper * zookeeper = nullptr;
@ -95,8 +86,7 @@ private:
};
/// Acquires block number locks in all partitions. The class is called Ephemeral- instead of Abandonable-
/// because it creates ephemeral block nodes (there is no need to leave abandoned tombstones).
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions
{
public:

View File

@ -935,12 +935,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
* When M > N parts could be replaced?
* - new block was added in ReplicatedMergeTreeBlockOutputStream;
* - it was added to working dataset in memory and renamed on filesystem;
* - but ZooKeeper transaction that add its to reference dataset in ZK and unlocks AbandonableLock is failed;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
* - and it is failed due to connection loss, so we don't rollback working dataset in memory,
* because we don't know if the part was added to ZK or not
* (see ReplicatedMergeTreeBlockOutputStream)
* - then method selectPartsToMerge selects a range and see, that AbandonableLock for this part is abandoned,
* and so, it is possible to merge a range skipping this part.
* - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
* and so it is possible to merge a range skipping this part.
* (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)
* - and after merge, this part will be removed in addition to parts that was merged.
*/

View File

@ -1,5 +1,5 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Interpreters/PartLog.h>

View File

@ -1267,14 +1267,14 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(queue.zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry);
}
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers");
std::vector<std::future<zkutil::ListResponse>> lock_futures;
@ -1310,7 +1310,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
committing_blocks[block.partition].insert(block.number);
}
}
@ -1338,7 +1338,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
/// A sketch of a proof of why this method actually works:
///
/// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right.
/// Inserted parts get their block numbers by acquiring an abandonable lock (see AbandonableLockInZooKeeper.h).
/// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h).
/// These block numbers are monotonically increasing in a partition.
///
/// Because there is a window between the moment the inserted part gets its block number and

View File

@ -2323,16 +2323,6 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
const String & partition_id = parts[0]->info.partition_id;
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Remove the unnecessary entries about non-existent blocks.
for (Int64 number = parts[i]->info.max_block + 1; number <= parts[i + 1]->info.min_block - 1; ++number)
{
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number));
}
}
if (out_log_entry)
*out_log_entry = entry;
@ -3414,7 +3404,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
}
std::optional<AbandonableLockInZooKeeper>
std::optional<EphemeralLockInZooKeeper>
StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path)
{
@ -3444,11 +3434,11 @@ StorageReplicatedMergeTree::allocateBlockNumber(
zkutil::KeeperMultiException::check(code, ops, responses);
}
AbandonableLockInZooKeeper lock;
EphemeralLockInZooKeeper lock;
/// 2 RTT
try
{
lock = AbandonableLockInZooKeeper(
lock = EphemeralLockInZooKeeper(
partition_path + "/block-", zookeeper_path + "/temp", *zookeeper, &deduplication_check_ops);
}
catch (const zkutil::KeeperMultiException & e)
@ -4385,7 +4375,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
MergeTreeData::MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<AbandonableLockInZooKeeper> abandonable_locks;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
@ -4441,7 +4431,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
abandonable_locks.emplace_back(std::move(*lock));
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
@ -4482,7 +4472,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
for (size_t i = 0; i < dst_parts.size(); ++i)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
abandonable_locks[i].getUnlockOps(ops);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
@ -4523,7 +4513,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
String log_znode_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : abandonable_locks)
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper

View File

@ -14,7 +14,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -460,7 +460,8 @@ private:
void updateQuorum(const String & part_name);
/// Creates new block number if block with such block_id does not exist
std::optional<AbandonableLockInZooKeeper> allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
/** Wait until all replicas, including this, execute the specified action from the log.

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>
@ -37,17 +37,17 @@ try
Stopwatch total;
Stopwatch stage;
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(zookeeper_path + "/temp/" + entry);
}
std::cerr << "Stage 1 (get lock holders): " << abandonable_lock_holders.size()
std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size()
<< " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
stage.restart();
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::cerr << "Stage 2 (get partitions): " << partitions.size()
@ -86,7 +86,7 @@ try
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
{
++total_count;
current_inserts[block.partition].insert(block.number);