Merge pull request #41410 from ClickHouse/keeper-multiread

Add MultiRead support in Keeper and internal ZK client
This commit is contained in:
Alexander Tokmakov 2022-10-03 14:17:57 +03:00 committed by GitHub
commit 00914a174d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 436 additions and 68 deletions

View File

@ -1,11 +1,11 @@
#pragma once
#include <base/types.h>
#include <future>
#include <memory>
#include <vector>
#include <Common/ZooKeeper/IKeeper.h>
#include <base/types.h>
#include <Poco/Event.h>
#include <Common/ZooKeeper/IKeeper.h>
namespace zkutil
@ -34,4 +34,9 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
Coordination::RequestPtr makeCheckRequest(const std::string & path, int version);
Coordination::RequestPtr makeGetRequest(const std::string & path);
Coordination::RequestPtr
makeListRequest(const std::string & path, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::RequestPtr makeSimpleListRequest(const std::string & path);
Coordination::RequestPtr makeExistsRequest(const std::string & path);
}

View File

@ -6,6 +6,8 @@
#include <functional>
#include <filesystem>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/randomSeed.h>
#include <base/find_symbols.h>
#include <base/sort.h>
@ -989,6 +991,24 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
return future;
}
std::future<Coordination::ListResponse>
ZooKeeper::asyncTryGetChildren(const std::string & path, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
auto callback = [promise, path](const Coordination::ListResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else
promise->set_value(response);
};
impl->list(path, list_request_type, std::move(callback), {});
return future;
}
std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::string & path, int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
@ -1207,6 +1227,37 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
return request;
}
Coordination::RequestPtr makeGetRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::GetRequest>();
request->path = path;
return request;
}
Coordination::RequestPtr makeListRequest(const std::string & path, Coordination::ListRequestType list_request_type)
{
// Keeper server that support MultiRead also support FilteredList
auto request = std::make_shared<Coordination::ZooKeeperFilteredListRequest>();
request->path = path;
request->list_request_type = list_request_type;
return request;
}
Coordination::RequestPtr makeSimpleListRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::ZooKeeperSimpleListRequest>();
request->path = path;
return request;
}
Coordination::RequestPtr makeExistsRequest(const std::string & path)
{
auto request = std::make_shared<Coordination::ZooKeeperExistsRequest>();
request->path = path;
return request;
}
std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')

View File

@ -12,6 +12,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/thread_local_rng.h>
@ -72,6 +73,63 @@ struct RemoveException
using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
template <typename T>
concept ZooKeeperResponse = std::derived_from<T, Coordination::Response>;
template <ZooKeeperResponse ResponseType>
struct MultiReadResponses
{
template <typename TResponses>
explicit MultiReadResponses(TResponses responses_) : responses(std::move(responses_))
{}
size_t size() const
{
return std::visit([](auto && resp) { return resp.size(); }, responses);
}
ResponseType & operator[](size_t index)
{
return std::visit(
[&]<typename TResponses>(TResponses & resp) -> ResponseType &
{
if constexpr (std::same_as<TResponses, RegularResponses>)
return dynamic_cast<ResponseType &>(*resp[index]);
else
return resp[index];
},
responses);
}
private:
using RegularResponses = std::vector<Coordination::ResponsePtr>;
using FutureResponses = std::vector<std::future<ResponseType>>;
struct ResponsesWithFutures
{
ResponsesWithFutures(FutureResponses future_responses_) : future_responses(std::move(future_responses_))
{
cached_responses.resize(future_responses.size());
}
FutureResponses future_responses;
std::vector<std::optional<ResponseType>> cached_responses;
ResponseType & operator[](size_t index)
{
if (cached_responses[index].has_value())
return *cached_responses[index];
cached_responses[index] = future_responses[index].get();
return *cached_responses[index];
}
size_t size() const { return future_responses.size(); }
};
std::variant<RegularResponses, ResponsesWithFutures> responses;
};
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
///
/// Poco::Event objects are used for watches. The event is set only once on the first
@ -89,7 +147,6 @@ public:
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/** Config of the form:
<zookeeper>
<node>
@ -160,16 +217,63 @@ public:
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiExistsResponse = MultiReadResponses<Coordination::ExistsResponse>;
template <typename TIter>
MultiExistsResponse exists(TIter start, TIter end)
{
return multiRead<Coordination::ExistsResponse, true>(
start, end, zkutil::makeExistsRequest, [&](const auto & path) { return asyncExists(path); });
}
MultiExistsResponse exists(const std::vector<std::string> & paths)
{
return exists(paths.begin(), paths.end());
}
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
using MultiGetResponse = MultiReadResponses<Coordination::GetResponse>;
template <typename TIter>
MultiGetResponse get(TIter start, TIter end)
{
return multiRead<Coordination::GetResponse, false>(
start, end, zkutil::makeGetRequest, [&](const auto & path) { return asyncGet(path); });
}
MultiGetResponse get(const std::vector<std::string> & paths)
{
return get(paths.begin(), paths.end());
}
/// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr,
Coordination::Error * code = nullptr);
bool tryGet(
const std::string & path,
std::string & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr,
Coordination::Error * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback,
Coordination::Error * code = nullptr);
bool tryGetWatch(
const std::string & path,
std::string & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::Error * code = nullptr);
template <typename TIter>
MultiGetResponse tryGet(TIter start, TIter end)
{
return multiRead<Coordination::GetResponse, true>(
start, end, zkutil::makeGetRequest, [&](const auto & path) { return asyncTryGet(path); });
}
MultiGetResponse tryGet(const std::vector<std::string> & paths)
{
return tryGet(paths.begin(), paths.end());
}
void set(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr);
@ -193,17 +297,57 @@ public:
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using MultiGetChildrenResponse = MultiReadResponses<Coordination::ListResponse>;
template <typename TIter>
MultiGetChildrenResponse
getChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return multiRead<Coordination::ListResponse, false>(
start,
end,
[list_request_type](const auto & path) { return zkutil::makeListRequest(path, list_request_type); },
[&](const auto & path) { return asyncGetChildren(path, {}, list_request_type); });
}
MultiGetChildrenResponse
getChildren(const std::vector<std::string> & paths, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return getChildren(paths.begin(), paths.end(), list_request_type);
}
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
template <typename TIter>
MultiGetChildrenResponse
tryGetChildren(TIter start, TIter end, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return multiRead<Coordination::ListResponse, true>(
start,
end,
[list_request_type](const auto & path) { return zkutil::makeListRequest(path, list_request_type); },
[&](const auto & path) { return asyncTryGetChildren(path, list_request_type); });
}
MultiGetChildrenResponse
tryGetChildren(const std::vector<std::string> & paths, Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL)
{
return tryGetChildren(paths.begin(), paths.end(), list_request_type);
}
/// Performs several operations in a transaction.
/// Throws on every error.
@ -327,6 +471,12 @@ public:
/// * The node doesn't exist
FutureGet asyncTryGet(const std::string & path);
/// Doesn't throw in the following cases:
/// * The node doesn't exist
FutureGetChildren asyncTryGetChildren(
const std::string & path,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
void finalize(const String & reason);
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
@ -354,6 +504,46 @@ private:
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);
using RequestFactory = std::function<Coordination::RequestPtr(const std::string &)>;
template <typename TResponse>
using AsyncFunction = std::function<std::future<TResponse>(const std::string &)>;
template <typename TResponse, bool try_multi, typename TIter>
MultiReadResponses<TResponse> multiRead(TIter start, TIter end, RequestFactory request_factory, AsyncFunction<TResponse> async_fun)
{
if (getApiVersion() >= DB::KeeperApiVersion::WITH_MULTI_READ)
{
Coordination::Requests requests;
for (auto it = start; it != end; ++it)
requests.push_back(request_factory(*it));
if constexpr (try_multi)
{
Coordination::Responses responses;
tryMulti(requests, responses);
return MultiReadResponses<TResponse>{std::move(responses)};
}
else
{
auto responses = multi(requests);
return MultiReadResponses<TResponse>{std::move(responses)};
}
}
auto responses_size = std::distance(start, end);
std::vector<std::future<TResponse>> future_responses;
if (responses_size == 0)
return MultiReadResponses<TResponse>(std::move(future_responses));
future_responses.reserve(responses_size);
for (auto it = start; it != end; ++it)
future_responses.push_back(async_fun(*it));
return MultiReadResponses<TResponse>{std::move(future_responses)};
}
std::unique_ptr<Coordination::IKeeper> impl;
ZooKeeperArgs args;

View File

@ -337,6 +337,15 @@ void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
void ZooKeeperSimpleListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);
}
void ZooKeeperSimpleListResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(names, out);
}
void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const
{
@ -426,16 +435,29 @@ void ZooKeeperErrorResponse::writeImpl(WriteBuffer & out) const
Coordination::write(error, out);
}
void ZooKeeperMultiRequest::checkOperationType(OperationType type)
{
chassert(!operation_type.has_value() || *operation_type == type);
operation_type = type;
}
OpNum ZooKeeperMultiRequest::getOpNum() const
{
return !operation_type.has_value() || *operation_type == OperationType::Write ? OpNum::Multi : OpNum::MultiRead;
}
ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls)
{
/// Convert nested Requests to ZooKeeperRequests.
/// Note that deep copy is required to avoid modifying path in presence of chroot prefix.
requests.reserve(generic_requests.size());
using enum OperationType;
for (const auto & generic_request : generic_requests)
{
if (const auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
{
checkOperationType(Write);
auto create = std::make_shared<ZooKeeperCreateRequest>(*concrete_request_create);
if (create->acls.empty())
create->acls = default_acls;
@ -443,16 +465,39 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests,
}
else if (const auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperRemoveRequest>(*concrete_request_remove));
}
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperSetRequest>(*concrete_request_set));
}
else if (const auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
{
checkOperationType(Write);
requests.push_back(std::make_shared<ZooKeeperCheckRequest>(*concrete_request_check));
}
else if (const auto * concrete_request_get = dynamic_cast<const GetRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperGetRequest>(*concrete_request_get));
}
else if (const auto * concrete_request_exists = dynamic_cast<const ExistsRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperExistsRequest>(*concrete_request_exists));
}
else if (const auto * concrete_request_simple_list = dynamic_cast<const ZooKeeperSimpleListRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperSimpleListRequest>(*concrete_request_simple_list));
}
else if (const auto * concrete_request_list = dynamic_cast<const ZooKeeperFilteredListRequest *>(generic_request.get()))
{
checkOperationType(Read);
requests.push_back(std::make_shared<ZooKeeperFilteredListRequest>(*concrete_request_list));
}
else
throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
}
@ -526,8 +571,7 @@ std::string ZooKeeperMultiRequest::toStringImpl() const
bool ZooKeeperMultiRequest::isReadRequest() const
{
/// Possibly we can do better
return false;
return getOpNum() == OpNum::MultiRead;
}
void ZooKeeperMultiResponse::readImpl(ReadBuffer & in)
@ -622,8 +666,18 @@ ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTi
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCheckResponse>()); }
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperMultiResponse>(requests)); }
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
{
std::shared_ptr<ZooKeeperMultiResponse> response;
if (getOpNum() == OpNum::Multi)
response = std::make_shared<ZooKeeperMultiWriteResponse>(requests);
else
response = std::make_shared<ZooKeeperMultiReadResponse>(requests);
return setTime(std::move(response));
}
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCloseResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetACLResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetACLResponse>()); }
@ -873,6 +927,12 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
{
auto res = std::make_shared<RequestT>();
res->request_created_time_ns = clock_gettime_ns();
if constexpr (num == OpNum::MultiRead)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;
else if constexpr (num == OpNum::Multi)
res->operation_type = ZooKeeperMultiRequest::OperationType::Write;
return res;
});
}
@ -892,6 +952,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::MultiRead, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);

View File

@ -257,6 +257,9 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
{
ZooKeeperExistsRequest() = default;
explicit ZooKeeperExistsRequest(const ExistsRequest & base) : ExistsRequest(base) {}
OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -281,6 +284,9 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
{
ZooKeeperGetRequest() = default;
explicit ZooKeeperGetRequest(const GetRequest & base) : GetRequest(base) {}
OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -333,6 +339,9 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
{
ZooKeeperListRequest() = default;
explicit ZooKeeperListRequest(const ListRequest & base) : ListRequest(base) {}
OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
@ -346,6 +355,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
{
OpNum getOpNum() const override { return OpNum::SimpleList; }
ZooKeeperResponsePtr makeResponse() const override;
};
struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
@ -373,7 +383,11 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::SimpleList; }
size_t bytesSize() const override { return ZooKeeperListResponse::bytesSize() - sizeof(stat); }
};
struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
@ -458,7 +472,7 @@ struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::Multi; }
OpNum getOpNum() const override;
ZooKeeperMultiRequest() = default;
ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls);
@ -473,12 +487,20 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
enum class OperationType : UInt8
{
Read,
Write
};
std::optional<OperationType> operation_type;
private:
void checkOperationType(OperationType type);
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
struct ZooKeeperMultiResponse : MultiResponse, ZooKeeperResponse
{
OpNum getOpNum() const override { return OpNum::Multi; }
explicit ZooKeeperMultiResponse(const Requests & requests)
{
responses.reserve(requests.size());
@ -501,6 +523,18 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperMultiWriteResponse final : public ZooKeeperMultiResponse
{
OpNum getOpNum() const override { return OpNum::Multi; }
using ZooKeeperMultiResponse::ZooKeeperMultiResponse;
};
struct ZooKeeperMultiReadResponse final : public ZooKeeperMultiResponse
{
OpNum getOpNum() const override { return OpNum::MultiRead; }
using ZooKeeperMultiResponse::ZooKeeperMultiResponse;
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDRequest final : ZooKeeperRequest

View File

@ -20,6 +20,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::List),
static_cast<int32_t>(OpNum::Check),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::MultiRead),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
static_cast<int32_t>(OpNum::SetACL),
@ -53,6 +54,8 @@ std::string toString(OpNum op_num)
return "Check";
case OpNum::Multi:
return "Multi";
case OpNum::MultiRead:
return "MultiRead";
case OpNum::Sync:
return "Sync";
case OpNum::Heartbeat:

View File

@ -31,6 +31,7 @@ enum class OpNum : int32_t
List = 12,
Check = 13,
Multi = 14,
MultiRead = 22,
Auth = 100,
// CH Keeper specific operations

View File

@ -1,5 +1,7 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <Common/EventNotifier.h>
@ -14,6 +16,7 @@
#include <base/getThreadId.h>
#include <Common/config.h>
#include "Coordination/KeeperConstants.h"
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
@ -1298,6 +1301,9 @@ void ZooKeeper::multi(
{
ZooKeeperMultiRequest request(requests, default_acls);
if (request.getOpNum() == OpNum::MultiRead && keeper_api_version < Coordination::KeeperApiVersion::WITH_MULTI_READ)
throw Exception(Error::ZBADARGUMENTS, "MultiRead request type cannot be used because it's not supported by the server");
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperMultiRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };

View File

@ -8,10 +8,11 @@ namespace DB
enum class KeeperApiVersion : uint8_t
{
ZOOKEEPER_COMPATIBLE = 0,
WITH_FILTERED_LIST
WITH_FILTERED_LIST,
WITH_MULTI_READ
};
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_FILTERED_LIST;
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_MULTI_READ;
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";

View File

@ -1601,6 +1601,9 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
{
using OperationType = Coordination::ZooKeeperMultiRequest::OperationType;
std::optional<OperationType> operation_type;
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
{
for (const auto & concrete_request : concrete_requests)
@ -1616,28 +1619,55 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
const auto check_operation_type = [&](OperationType type)
{
if (operation_type.has_value() && *operation_type != type)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal mixing of read and write operations in multi request");
operation_type = type;
};
for (const auto & sub_request : request.requests)
{
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
switch (sub_zk_request->getOpNum())
{
case Coordination::OpNum::Create:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Set:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Get:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageGetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Exists:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageExistsRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::List:
case Coordination::OpNum::FilteredList:
case Coordination::OpNum::SimpleList:
check_operation_type(OperationType::Read);
concrete_requests.push_back(std::make_shared<KeeperStorageListRequestProcessor>(sub_zk_request));
break;
default:
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
assert(request.requests.empty() || operation_type.has_value());
}
std::vector<KeeperStorage::Delta>
@ -1652,7 +1682,8 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
if (!new_deltas.empty())
{
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation))
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation);
error && *operation_type == OperationType::Write)
{
storage.uncommitted_state.rollback(zxid);
response_errors.push_back(error->error);
@ -1699,8 +1730,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->process(storage, zxid);
response.responses[i] = cur_response;
response.responses[i] = concrete_requests[i]->process(storage, zxid);
storage.uncommitted_state.commit(zxid);
}
@ -1715,26 +1745,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->processLocal(storage, zxid);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
{
for (size_t j = 0; j <= i; ++j)
{
auto response_error = response.responses[j]->error;
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = response_error;
}
for (size_t j = i + 1; j < response.responses.size(); ++j)
{
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
}
return response_ptr;
}
response.responses[i] = concrete_requests[i]->processLocal(storage, zxid);
}
response.error = Coordination::Error::ZOK;
@ -1881,6 +1892,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::FilteredList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::MultiRead, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
}

View File

@ -116,29 +116,29 @@ String TransactionLog::serializeTID(const TransactionID & tid)
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
{
std::vector<std::future<Coordination::GetResponse>> futures;
size_t entries_count = std::distance(beg, end);
if (!entries_count)
return;
String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
std::vector<std::string> entry_paths;
entry_paths.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
entry_paths.emplace_back(fs::path(zookeeper_path_log) / *it);
auto entries = TSA_READ_ONE_THREAD(zookeeper)->get(entry_paths);
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
auto it = beg;
for (size_t i = 0; i < entries_count; ++i, ++it)
{
auto res = futures[i].get();
auto res = entries[i];
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
}
futures.clear();
NOEXCEPT_SCOPE_STRICT({
std::lock_guard lock{mutex};

View File

@ -83,6 +83,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"List", static_cast<Int16>(Coordination::OpNum::List)},
{"Check", static_cast<Int16>(Coordination::OpNum::Check)},
{"Multi", static_cast<Int16>(Coordination::OpNum::Multi)},
{"MultiRead", static_cast<Int16>(Coordination::OpNum::MultiRead)},
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},

View File

@ -1977,9 +1977,12 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
std::vector<std::future<Coordination::ListResponse>> lock_futures;
std::vector<std::string> paths;
paths.reserve(partitions.size());
for (const String & partition : partitions)
lock_futures.push_back(zookeeper->asyncGetChildren(fs::path(queue.zookeeper_path) / "block_numbers" / partition));
paths.push_back(fs::path(queue.zookeeper_path) / "block_numbers" / partition);
auto locks_children = zookeeper->getChildren(paths);
struct BlockInfoInZooKeeper
{
@ -1992,7 +1995,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
std::vector<BlockInfoInZooKeeper> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = lock_futures[i].get().names;
Strings partition_block_numbers = locks_children[i].names;
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.

View File

@ -124,8 +124,6 @@ public:
{
auto zookeeper = storage.getClient();
Coordination::Requests requests;
auto keys_limit = storage.keysLimit();
size_t current_keys_num = 0;
@ -140,23 +138,25 @@ public:
current_keys_num = data_stat.numChildren;
}
std::vector<std::pair<const std::string *, std::future<Coordination::ExistsResponse>>> exist_responses;
for (const auto & [key, value] : new_values)
{
auto path = storage.fullPathForKey(key);
std::vector<std::string> key_paths;
key_paths.reserve(new_values.size());
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
exist_responses.push_back({&key, zookeeper->asyncExists(path)});
}
auto results = zookeeper->exists(key_paths);
for (auto & [key, response] : exist_responses)
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
if (response.get().error == Coordination::Error::ZOK)
auto key = fs::path(key_paths[i]).filename();
if (results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(storage.fullPathForKey(*key), new_values[*key], -1));
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(storage.fullPathForKey(*key), new_values[*key], zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}