Using different ZooKeeper library (development) [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-03-25 03:15:52 +03:00
parent 0714a217fe
commit 522bf01db9
7 changed files with 50 additions and 66 deletions

View File

@ -498,17 +498,10 @@ Responses ZooKeeper::multi(const Requests & requests)
return responses;
}
int32_t ZooKeeper::tryMulti(const Requests & requests)
int32_t ZooKeeper::tryMulti(const Requests & requests, Responses & responses)
{
Responses responses;
int32_t code = multiImpl(requests, responses);
if (!(code == ZooKeeperImpl::ZooKeeper::ZOK ||
code == ZooKeeperImpl::ZooKeeper::ZNONODE ||
code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS ||
code == ZooKeeperImpl::ZooKeeper::ZNOCHILDRENFOREPHEMERALS ||
code == ZooKeeperImpl::ZooKeeper::ZBADVERSION ||
code == ZooKeeperImpl::ZooKeeper::ZNOTEMPTY))
if (code && !isUserError(code))
throw KeeperException(code);
return code;
}
@ -558,7 +551,8 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
/// Try to remove the children with a faster method - in bulk. If this fails,
/// this means someone is concurrently removing these children and we will have
/// to remove them one by one.
if (tryMulti(ops) != ZooKeeperImpl::ZooKeeper::ZOK)
Responses responses;
if (tryMulti(ops, responses) != ZooKeeperImpl::ZooKeeper::ZOK)
for (const std::string & child : batch)
tryRemove(child);
}

View File

@ -148,7 +148,7 @@ public:
Responses multi(const Requests & requests);
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
int32_t tryMulti(const Requests & requests);
int32_t tryMulti(const Requests & requests, Responses & responses);
/// Throws nothing, just alias of multiImpl
int32_t tryMultiNoThrow(const Requests & requests, Responses & responses)
{

View File

@ -808,7 +808,8 @@ void DDLWorker::createStatusDirs(const std::string & node_path)
request.path = node_path + "/finished";
ops.emplace_back(std::make_shared<zkutil::CreateRequest>(std::move(request)));
}
int code = zookeeper->tryMulti(ops);
zkutil::Responses responses;
int code = zookeeper->tryMulti(ops, responses);
if (code && code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw zkutil::KeeperException(code);
}

View File

@ -354,32 +354,26 @@ struct TaskCluster
};
struct MultiTransactionInfo
{
int32_t code;
zkutil::Requests requests;
zkutil::Responses responses;
};
/// Atomically checks that is_dirty node is not exists, and made the remaining op
/// Returns relative number of failed operation in the second field (the passed op has 0 index)
static zkutil::MultiTransactionInfo checkNoNodeAndCommit(
static MultiTransactionInfo checkNoNodeAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
const String & checking_node_path,
zkutil::RequestPtr && op)
{
zkutil::Requests ops;
{
zkutil::CreateRequest request;
request.path = checking_node_path;
ops.emplace_back(std::make_shared<zkutil::CreateRequest>(std::move(request)));
}
{
zkutil::RemoveRequest request;
request.path = checking_node_path;
ops.emplace_back(std::make_shared<zkutil::RemoveRequest>(std::move(request)));
}
ops.emplace_back(std::move(op));
zkutil::Responses responses;
auto code = zookeeper->tryMultiNoThrow(ops, responses);
if (code && !zkutil::isUserError(code))
throw zkutil::KeeperException(code);
MultiTransactionInfo info;
info.requests.emplace_back(zkutil::makeCreateRequest(checking_node_path, "", zkutil::CreateMode::Persistent));
info.requests.emplace_back(zkutil::makeRemoveRequest(checking_node_path, -1));
info.requests.emplace_back(std::move(op));
info.code = zookeeper->tryMulti(info.requests, info.responses);
return info;
}
@ -740,7 +734,7 @@ public:
{
auto zookeeper = getZooKeeper();
task_description_watch_callback = [this] (zkutil::ZooKeeper &, int, int, const char *)
task_description_watch_callback = [this] (const ZooKeeperImpl::ZooKeeper::WatchResponse &)
{
UInt64 version = ++task_descprtion_version;
LOG_DEBUG(log, "Task description should be updated, local version " << version);
@ -1061,7 +1055,8 @@ protected:
zkutil::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
auto code = zookeeper->tryMulti(ops);
zkutil::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == ZooKeeperImpl::ZooKeeper::ZOK || code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
@ -1466,7 +1461,7 @@ protected:
auto create_is_dirty_node = [&] ()
{
auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
if (code && code != ZNODEEXISTS)
if (code && code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw zkutil::KeeperException(code, is_dirty_flag_path);
};
@ -1587,17 +1582,19 @@ protected:
{
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
auto op_create = zkutil::makeCreateRequest(current_task_status_path, start_state, zkutil::CreateMode::Persistent);
zkutil::MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create));
MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create));
if (info.code)
{
if (info.getFailedOp().getPath() == is_dirty_flag_path)
zkutil::KeeperMultiException exception(info.code, info.requests, info.responses);
if (exception.getPathForFirstFailedOp() == is_dirty_flag_path)
{
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
return PartitionTaskStatus::Error;
}
throw zkutil::KeeperException(info.code, current_task_status_path);
throw exception;
}
}
@ -1674,7 +1671,7 @@ protected:
if (future_is_dirty_checker != nullptr)
{
zkutil::ZooKeeper::StatAndExists status;
zkutil::ExistsResponse status;
try
{
status = future_is_dirty_checker->get();
@ -1729,11 +1726,13 @@ protected:
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto op_set = zkutil::makeSetRequest(current_task_status_path, state_finished, 0);
zkutil::MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set));
MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set));
if (info.code)
{
if (info.getFailedOp().getPath() == is_dirty_flag_path)
zkutil::KeeperMultiException exception(info.code, info.requests, info.responses);
if (exception.getPathForFirstFailedOp() == is_dirty_flag_path)
LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
else
LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << zkutil::ZooKeeper::error2string(info.code));

View File

@ -174,7 +174,8 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
if (!ops.empty())
{
auto code = zookeeper->tryMulti(ops);
zkutil::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code)
LOG_ERROR(log, "Couldn't set value of nodes for insert times ("

View File

@ -261,8 +261,9 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
zkutil::Requests ops;
zkutil::Responses responses;
storage.removePartFromZooKeeper(part_name, ops);
auto code = zookeeper->tryMulti(ops);
auto code = zookeeper->tryMulti(ops, responses);
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
LOG_WARNING(log, "Part " << part_name << " with failed quorum is not in ZooKeeper. This shouldn't happen often.");

View File

@ -499,7 +499,8 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "",
zkutil::CreateMode::Persistent));
auto code = zookeeper->tryMulti(ops);
zkutil::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code && code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw zkutil::KeeperException(code);
}
@ -1393,7 +1394,8 @@ bool StorageReplicatedMergeTree::executeFetch(const StorageReplicatedMergeTree::
if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/blocks/" + entry.block_id, -1));
auto code = zookeeper->tryMulti(ops);
zkutil::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == ZooKeeperImpl::ZooKeeper::ZOK)
{
@ -1507,8 +1509,9 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
data.renameAndDetachPart(part);
zkutil::Requests ops;
zkutil::Responses responses;
removePartFromZooKeeper(part->name, ops);
auto code = getZooKeeper()->tryMulti(ops);
auto code = getZooKeeper()->tryMulti(ops, responses);
/// If the part is already removed (for example, because it was never added to ZK due to crash,
/// see ReplicatedMergeTreeBlockOutputStream), then Ok.
@ -2170,9 +2173,10 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
zkutil::Requests ops;
zkutil::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(quorum_status_path, stat.version));
ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, part_name, -1));
auto code = zookeeper->tryMulti(ops);
auto code = zookeeper->tryMulti(ops, responses);
if (code == ZooKeeperImpl::ZooKeeper::ZOK)
{
@ -3632,22 +3636,6 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
}
static int32_t tryMulti(zkutil::ZooKeeperPtr & zookeeper, zkutil::Requests & ops) noexcept
{
int32_t code;
try
{
code = zookeeper->tryMulti(ops);
}
catch (const zkutil::KeeperException & e)
{
code = e.code;
}
return code;
}
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retied)
{
@ -3661,8 +3649,8 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
auto it_next = std::next(it);
if (ops.size() >= zkutil::MULTI_BATCH_SIZE || it_next == part_names.cend())
{
/// It is Ok to use multi with retries to delete nodes, because new nodes with the same names cannot appear here
auto code = tryMulti(zookeeper, ops);
zkutil::Responses unused_responses;
auto code = zookeeper->tryMultiNoThrow(ops, unused_responses);
ops.clear();
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
@ -3674,7 +3662,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
{
zkutil::Requests cur_ops;
removePartFromZooKeeper(*it_in_batch, cur_ops);
auto cur_code = tryMulti(zookeeper, cur_ops);
auto cur_code = zookeeper->tryMultiNoThrow(cur_ops, unused_responses);
if (cur_code == ZooKeeperImpl::ZooKeeper::ZNONODE)
{