2018-07-04 16:31:21 +00:00
|
|
|
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
|
2018-04-04 12:39:48 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
2022-04-27 15:05:45 +00:00
|
|
|
#include <Common/logger_useful.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/types.h>
|
2018-04-04 12:39:48 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
2022-11-10 11:12:24 +00:00
|
|
|
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, zkutil::ZooKeeper & zookeeper_, const String & path_)
|
|
|
|
: zookeeper(&zookeeper_), path_prefix(path_prefix_), path(path_)
|
2022-07-06 16:59:53 +00:00
|
|
|
{
|
|
|
|
if (path.size() <= path_prefix.size())
|
|
|
|
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
2022-11-10 11:12:24 +00:00
|
|
|
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, const String & deduplication_path)
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
String path;
|
2018-04-04 12:39:48 +00:00
|
|
|
|
2022-07-06 16:59:53 +00:00
|
|
|
if (deduplication_path.empty())
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
String holder_path = temp_path + "/" + EphemeralLockInZooKeeper::LEGACY_LOCK_OTHER;
|
2022-11-10 11:12:24 +00:00
|
|
|
path = zookeeper_.create(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential);
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
String holder_path = temp_path + "/" + EphemeralLockInZooKeeper::LEGACY_LOCK_INSERT;
|
|
|
|
|
2022-07-06 16:59:53 +00:00
|
|
|
/// Check for duplicates in advance, to avoid superfluous block numbers allocation
|
|
|
|
Coordination::Requests ops;
|
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
|
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
|
2022-11-01 20:35:25 +00:00
|
|
|
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
|
2022-07-06 16:59:53 +00:00
|
|
|
Coordination::Responses responses;
|
2022-11-10 11:12:24 +00:00
|
|
|
Coordination::Error e = zookeeper_.tryMulti(ops, responses);
|
2022-07-06 16:59:53 +00:00
|
|
|
if (e != Coordination::Error::ZOK)
|
|
|
|
{
|
2022-10-27 21:31:53 +00:00
|
|
|
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
|
2022-07-06 16:59:53 +00:00
|
|
|
{
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
|
|
|
|
throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
}
|
2018-04-04 12:39:48 +00:00
|
|
|
|
2022-11-01 20:35:25 +00:00
|
|
|
path = dynamic_cast<const Coordination::CreateResponse *>(responses.back().get())->path_created;
|
2022-07-06 16:59:53 +00:00
|
|
|
}
|
2018-04-04 12:39:48 +00:00
|
|
|
|
2022-11-01 20:35:25 +00:00
|
|
|
return EphemeralLockInZooKeeper{path_prefix_, zookeeper_, path};
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
|
2018-07-04 16:31:21 +00:00
|
|
|
void EphemeralLockInZooKeeper::unlock()
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2018-08-25 01:58:14 +00:00
|
|
|
Coordination::Requests ops;
|
2022-11-02 15:24:18 +00:00
|
|
|
getUnlockOp(ops);
|
2018-07-04 16:31:21 +00:00
|
|
|
zookeeper->multi(ops);
|
2022-11-01 20:35:25 +00:00
|
|
|
zookeeper = nullptr;
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
|
2022-11-02 15:24:18 +00:00
|
|
|
void EphemeralLockInZooKeeper::getUnlockOp(Coordination::Requests & ops)
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
|
|
|
checkCreated();
|
|
|
|
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
|
|
|
|
}
|
|
|
|
|
2018-07-04 16:31:21 +00:00
|
|
|
EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
if (!isLocked())
|
2018-04-04 12:39:48 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2018-07-04 16:31:21 +00:00
|
|
|
unlock();
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2022-11-10 11:12:24 +00:00
|
|
|
tryLogCurrentException("~EphemeralLockInZooKeeper");
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
|
|
|
|
const String & block_numbers_path, const String & path_prefix, const String & temp_path,
|
|
|
|
zkutil::ZooKeeper & zookeeper_)
|
2020-11-10 10:23:46 +00:00
|
|
|
: zookeeper(&zookeeper_)
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
String holder_path = temp_path + "/" + EphemeralLockInZooKeeper::LEGACY_LOCK_OTHER;
|
2018-04-04 12:39:48 +00:00
|
|
|
while (true)
|
|
|
|
{
|
2018-08-25 01:58:14 +00:00
|
|
|
Coordination::Stat partitions_stat;
|
2020-11-10 10:23:46 +00:00
|
|
|
Strings partitions = zookeeper->getChildren(block_numbers_path, &partitions_stat);
|
2018-04-04 12:39:48 +00:00
|
|
|
|
2018-08-25 01:58:14 +00:00
|
|
|
Coordination::Requests lock_ops;
|
2022-11-01 20:35:25 +00:00
|
|
|
for (const auto & partition : partitions)
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
String partition_path_prefix = block_numbers_path + "/" + partition + "/" + path_prefix;
|
2018-04-04 12:39:48 +00:00
|
|
|
lock_ops.push_back(zkutil::makeCreateRequest(
|
2022-11-01 20:35:25 +00:00
|
|
|
partition_path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential));
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version));
|
|
|
|
|
2018-08-25 01:58:14 +00:00
|
|
|
Coordination::Responses lock_responses;
|
2020-11-10 10:23:46 +00:00
|
|
|
Coordination::Error rc = zookeeper->tryMulti(lock_ops, lock_responses);
|
2020-06-12 15:09:12 +00:00
|
|
|
if (rc == Coordination::Error::ZBADVERSION)
|
2018-04-04 12:39:48 +00:00
|
|
|
{
|
2020-05-30 21:57:37 +00:00
|
|
|
LOG_TRACE(&Poco::Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry.");
|
2018-04-04 12:39:48 +00:00
|
|
|
continue;
|
|
|
|
}
|
2020-06-12 15:09:12 +00:00
|
|
|
else if (rc != Coordination::Error::ZOK)
|
2018-08-25 01:58:14 +00:00
|
|
|
throw Coordination::Exception(rc);
|
2018-04-04 12:39:48 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < partitions.size(); ++i)
|
|
|
|
{
|
|
|
|
size_t prefix_size = block_numbers_path.size() + 1 + partitions[i].size() + 1 + path_prefix.size();
|
2018-08-25 01:58:14 +00:00
|
|
|
const String & path = dynamic_cast<const Coordination::CreateResponse &>(*lock_responses[i]).path_created;
|
2018-04-04 12:39:48 +00:00
|
|
|
if (path.size() <= prefix_size)
|
|
|
|
throw Exception("Logical error: name of the sequential node is shorter than prefix.",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
UInt64 number = parse<UInt64>(path.c_str() + prefix_size, path.size() - prefix_size);
|
2022-11-01 20:35:25 +00:00
|
|
|
locks.push_back(LockInfo{path, partitions[i], number});
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void EphemeralLocksInAllPartitions::unlock()
|
|
|
|
{
|
2020-11-10 10:23:46 +00:00
|
|
|
if (!zookeeper)
|
|
|
|
return;
|
|
|
|
|
2022-11-01 20:35:25 +00:00
|
|
|
std::vector<zkutil::ZooKeeper::FutureRemove> futures;
|
2022-11-07 19:27:18 +00:00
|
|
|
futures.reserve(locks.size());
|
2018-04-04 12:39:48 +00:00
|
|
|
for (const auto & lock : locks)
|
|
|
|
{
|
2022-11-01 20:35:25 +00:00
|
|
|
futures.push_back(zookeeper->asyncRemove(lock.path));
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for (auto & future : futures)
|
|
|
|
future.get();
|
|
|
|
|
|
|
|
locks.clear();
|
2022-11-01 20:35:25 +00:00
|
|
|
zookeeper = nullptr;
|
2018-04-04 12:39:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
unlock();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException("~EphemeralLocksInAllPartitions");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|