This commit is contained in:
Alexander Tokmakov 2022-11-07 20:27:18 +01:00
parent b8174a63a8
commit 9210e586d2
6 changed files with 36 additions and 21 deletions

View File

@ -76,7 +76,7 @@ using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
template <typename T>
concept ZooKeeperResponse = std::derived_from<T, Coordination::Response>;
template <ZooKeeperResponse ResponseType>
template <ZooKeeperResponse ResponseType, bool try_multi>
struct MultiReadResponses
{
template <typename TResponses>
@ -96,7 +96,17 @@ struct MultiReadResponses
if constexpr (std::same_as<TResponses, RegularResponses>)
return dynamic_cast<ResponseType &>(*resp[index]);
else
{
if constexpr (try_multi)
{
/// We should not ignore errors except ZNONODE
/// for consistency with exists, tryGet and tryGetChildren
const auto & error = resp[index].error;
if (error != Coordination::Error::ZOK && error != Coordination::Error::ZNONODE)
throw KeeperException(error);
}
return resp[index];
}
},
responses);
}
@ -144,6 +154,7 @@ class ZooKeeper
public:
using Ptr = std::shared_ptr<ZooKeeper>;
using ErrorsList = std::initializer_list<Coordination::Error>;
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
@ -217,7 +228,7 @@ public:
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiExistsResponse = MultiReadResponses<Coordination::ExistsResponse>;
using MultiExistsResponse = MultiReadResponses<Coordination::ExistsResponse, true>;
template <typename TIter>
MultiExistsResponse exists(TIter start, TIter end)
{
@ -233,7 +244,8 @@ public:
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiGetResponse = MultiReadResponses<Coordination::GetResponse>;
using MultiGetResponse = MultiReadResponses<Coordination::GetResponse, false>;
using MultiTryGetResponse = MultiReadResponses<Coordination::GetResponse, true>;
template <typename TIter>
MultiGetResponse get(TIter start, TIter end)
@ -264,13 +276,13 @@ public:
Coordination::Error * code = nullptr);
template <typename TIter>
MultiGetResponse tryGet(TIter start, TIter end)
MultiTryGetResponse tryGet(TIter start, TIter end)
{
return multiRead<Coordination::GetResponse, true>(
start, end, zkutil::makeGetRequest, [&](const auto & path) { return asyncTryGet(path); });
}
MultiGetResponse tryGet(const std::vector<std::string> & paths)
MultiTryGetResponse tryGet(const std::vector<std::string> & paths)
{
return tryGet(paths.begin(), paths.end());
}
@ -297,7 +309,8 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse>;
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, false>;
using MultiTryGetChildrenResponse = MultiReadResponses<Coordination::ListResponse, true>;
template <typename TIter>
MultiGetChildrenResponse
@ -333,7 +346,7 @@ public:
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
template <typename TIter>
MultiGetChildrenResponse
MultiTryGetChildrenResponse
tryGetChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return multiRead<Coordination::ListResponse, true>(
@ -343,7 +356,7 @@ public:
[&](const auto & path) { return asyncTryGetChildren(path, list_request_type); });
}
MultiGetChildrenResponse
MultiTryGetChildrenResponse
tryGetChildren(const std::vector<std::string> & paths, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return tryGetChildren(paths.begin(), paths.end(), list_request_type);
@ -511,7 +524,7 @@ private:
using AsyncFunction = std::function<std::future<TResponse>(const std::string &)>;
template <typename TResponse, bool try_multi, typename TIter>
MultiReadResponses<TResponse> multiRead(TIter start, TIter end, RequestFactory request_factory, AsyncFunction<TResponse> async_fun)
MultiReadResponses<TResponse, try_multi> multiRead(TIter start, TIter end, RequestFactory request_factory, AsyncFunction<TResponse> async_fun)
{
if (getApiVersion() >= DB::KeeperApiVersion::WITH_MULTI_READ)
{
@ -523,12 +536,12 @@ private:
{
Coordination::Responses responses;
tryMulti(requests, responses);
return MultiReadResponses<TResponse>{std::move(responses)};
return MultiReadResponses<TResponse, try_multi>{std::move(responses)};
}
else
{
auto responses = multi(requests);
return MultiReadResponses<TResponse>{std::move(responses)};
return MultiReadResponses<TResponse, try_multi>{std::move(responses)};
}
}
@ -536,14 +549,14 @@ private:
std::vector<std::future<TResponse>> future_responses;
if (responses_size == 0)
return MultiReadResponses<TResponse>(std::move(future_responses));
return MultiReadResponses<TResponse, try_multi>(std::move(future_responses));
future_responses.reserve(responses_size);
for (auto it = start; it != end; ++it)
future_responses.push_back(async_fun(*it));
return MultiReadResponses<TResponse>{std::move(future_responses)};
return MultiReadResponses<TResponse, try_multi>{std::move(future_responses)};
}
std::unique_ptr<Coordination::IKeeper> impl;

View File

@ -141,6 +141,7 @@ void EphemeralLocksInAllPartitions::unlock()
return;
std::vector<zkutil::ZooKeeper::FutureRemove> futures;
futures.reserve(locks.size());
for (const auto & lock : locks)
{
futures.push_back(zookeeper->asyncRemove(lock.path));

View File

@ -469,7 +469,7 @@ void ReplicatedMergeTreeSink::commitPart(
else
quorum_path = storage.zookeeper_path + "/quorum/status";
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value, replicas_num);
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_version, replicas_num);
}
else
{
@ -636,7 +636,7 @@ void ReplicatedMergeTreeSink::commitPart(
storage.updateQuorum(part->name, false);
}
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value, replicas_num);
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
}
}
@ -658,7 +658,7 @@ void ReplicatedMergeTreeSink::waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
const std::string & is_active_node_value,
Int32 is_active_node_version,
size_t replicas_num) const
{
/// We are waiting for quorum to be satisfied.
@ -691,9 +691,10 @@ void ReplicatedMergeTreeSink::waitForQuorum(
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
Coordination::Stat stat;
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != is_active_node_value)
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, &stat)
|| stat.version != is_active_node_version)
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
}
catch (...)

View File

@ -85,7 +85,7 @@ private:
/// Also checks that replica still alive.
void waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
const std::string & quorum_path, const std::string & is_active_node_value, size_t replicas_num) const;
const std::string & quorum_path, int is_active_node_version, size_t replicas_num) const;
StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot;

View File

@ -6484,7 +6484,7 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOps(
if (startsWith(block_id, partition_prefix))
paths_to_get.push_back(fs::path(zookeeper_path) / "blocks" / block_id);
auto results = zookeeper.get(paths_to_get);
auto results = zookeeper.tryGet(paths_to_get);
for (size_t i = 0; i < paths_to_get.size(); ++i)
{

View File

@ -27,7 +27,7 @@ select 'blocks';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks%' and op_num not in (1, 12, 500))
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks/%' and op_num not in (1, 12, 500))
order by xid, type, request_idx;
drop table rmt;