diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp new file mode 100644 index 00000000000..eb04536ae00 --- /dev/null +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -0,0 +1,623 @@ +#include +#include +#include +#include +#include +#include + + +namespace Coordination +{ + +using namespace DB; + +/// ZooKeeper has 1 MB node size and serialization limit by default, +/// but it can be raised up, so we have a slightly larger limit on our side. +#define MAX_STRING_OR_ARRAY_SIZE (1 << 28) /// 256 MiB + +/// Assuming we are at little endian. + +static void write(int64_t x, WriteBuffer & out) +{ + x = __builtin_bswap64(x); + writeBinary(x, out); +} + +static void write(int32_t x, WriteBuffer & out) +{ + x = __builtin_bswap32(x); + writeBinary(x, out); +} + +static void write(bool x, WriteBuffer & out) +{ + writeBinary(x, out); +} + +static void write(const String & s, WriteBuffer & out) +{ + write(int32_t(s.size()), out); + out.write(s.data(), s.size()); +} + +template void write(std::array s, WriteBuffer & out) +{ + write(int32_t(N), out); + out.write(s.data(), N); +} + +template void write(const std::vector & arr, WriteBuffer & out) +{ + write(int32_t(arr.size()), out); + for (const auto & elem : arr) + write(elem, out); +} + +static void write(const ACL & acl, WriteBuffer & out) +{ + write(acl.permissions, out); + write(acl.scheme, out); + write(acl.id, out); +} + +static void write(const Stat & stat, WriteBuffer & out) +{ + write(stat.czxid, out); + write(stat.mzxid, out); + write(stat.ctime, out); + write(stat.mtime, out); + write(stat.version, out); + write(stat.cversion, out); + write(stat.aversion, out); + write(stat.ephemeralOwner, out); + write(stat.dataLength, out); + write(stat.numChildren, out); + write(stat.pzxid, out); +} + +static void write(const Error & x, WriteBuffer & out) +{ + write(static_cast(x), out); +} + +static void read(int64_t & x, ReadBuffer & in) +{ + readBinary(x, in); + x = __builtin_bswap64(x); +} + +static void read(int32_t & x, ReadBuffer & in) +{ + readBinary(x, in); + x = __builtin_bswap32(x); +} + +static void read(Error & x, ReadBuffer & in) +{ + int32_t code; + read(code, in); + x = Error(code); +} + +static void read(bool & x, ReadBuffer & in) +{ + readBinary(x, in); +} + +static void read(String & s, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + + if (size == -1) + { + /// It means that zookeeper node has NULL value. We will treat it like empty string. + s.clear(); + return; + } + + if (size < 0) + throw Exception("Negative size while reading string from ZooKeeper", Error::ZMARSHALLINGERROR); + + if (size > MAX_STRING_OR_ARRAY_SIZE) + throw Exception("Too large string size while reading from ZooKeeper", Error::ZMARSHALLINGERROR); + + s.resize(size); + in.read(s.data(), size); +} + +template void read(std::array & s, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + if (size != N) + throw Exception("Unexpected array size while reading from ZooKeeper", Error::ZMARSHALLINGERROR); + in.read(s.data(), N); +} + +static void read(Stat & stat, ReadBuffer & in) +{ + read(stat.czxid, in); + read(stat.mzxid, in); + read(stat.ctime, in); + read(stat.mtime, in); + read(stat.version, in); + read(stat.cversion, in); + read(stat.aversion, in); + read(stat.ephemeralOwner, in); + read(stat.dataLength, in); + read(stat.numChildren, in); + read(stat.pzxid, in); +} + +template void read(std::vector & arr, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + if (size < 0) + throw Exception("Negative size while reading array from ZooKeeper", Error::ZMARSHALLINGERROR); + if (size > MAX_STRING_OR_ARRAY_SIZE) + throw Exception("Too large array size while reading from ZooKeeper", Error::ZMARSHALLINGERROR); + arr.resize(size); + for (auto & elem : arr) + read(elem, in); +} + +static void read(ACL & acl, ReadBuffer & in) +{ + read(acl.permissions, in); + read(acl.scheme, in); + read(acl.id, in); +} + +void ZooKeeperRequest::write(WriteBuffer & out) const +{ + /// Excessive copy to calculate length. + WriteBufferFromOwnString buf; + Coordination::write(xid, buf); + Coordination::write(getOpNum(), buf); + writeImpl(buf); + Coordination::write(buf.str(), out); + out.next(); +} + +void ZooKeeperWatchResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(type, in); + Coordination::read(state, in); + Coordination::read(path, in); +} + +void ZooKeeperWatchResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(type, out); + Coordination::write(state, out); + Coordination::write(path, out); +} + +void ZooKeeperAuthRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(type, out); + Coordination::write(scheme, out); + Coordination::write(data, out); +} + +void ZooKeeperAuthRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(type, in); + Coordination::read(scheme, in); + Coordination::read(data, in); +} + +void ZooKeeperCreateRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(data, out); + Coordination::write(acls, out); + + int32_t flags = 0; + + if (is_ephemeral) + flags |= 1; + if (is_sequential) + flags |= 2; + + Coordination::write(flags, out); +} + +void ZooKeeperCreateRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(data, in); + Coordination::read(acls, in); + + int32_t flags = 0; + Coordination::read(flags, in); + + if (flags & 1) + is_ephemeral = true; + if (flags & 2) + is_sequential = true; +} + +void ZooKeeperCreateResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(path_created, in); +} + +void ZooKeeperCreateResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path_created, out); +} + +void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(version, out); +} + +void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(version, in); +} + +void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(has_watch, out); +} + +void ZooKeeperExistsRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(has_watch, in); +} + +void ZooKeeperExistsResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(stat, in); +} + +void ZooKeeperExistsResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(stat, out); +} + +void ZooKeeperGetRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(has_watch, out); +} + +void ZooKeeperGetRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(has_watch, in); +} + +void ZooKeeperGetResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(data, in); + Coordination::read(stat, in); +} + +void ZooKeeperGetResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(data, out); + Coordination::write(stat, out); +} + +void ZooKeeperSetRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(data, out); + Coordination::write(version, out); +} + +void ZooKeeperSetRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(data, in); + Coordination::read(version, in); +} + +void ZooKeeperSetResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(stat, in); +} + +void ZooKeeperSetResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(stat, out); +} + +void ZooKeeperListRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(has_watch, out); +} + +void ZooKeeperListRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(has_watch, in); +} + +void ZooKeeperListResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(names, in); + Coordination::read(stat, in); +} + +void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(names, out); + Coordination::write(stat, out); +} + +void ZooKeeperCheckRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(version, out); +} + +void ZooKeeperCheckRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(version, in); +} + +void ZooKeeperErrorResponse::readImpl(ReadBuffer & in) +{ + Coordination::Error read_error; + Coordination::read(read_error, in); + + if (read_error != error) + throw Exception(fmt::format("Error code in ErrorResponse ({}) doesn't match error code in header ({})", read_error, error), + Error::ZMARSHALLINGERROR); +} + +void ZooKeeperErrorResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(error, out); +} + +ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls) +{ + /// Convert nested Requests to ZooKeeperRequests. + /// Note that deep copy is required to avoid modifying path in presence of chroot prefix. + requests.reserve(generic_requests.size()); + + for (const auto & generic_request : generic_requests) + { + if (const auto * concrete_request_create = dynamic_cast(generic_request.get())) + { + auto create = std::make_shared(*concrete_request_create); + if (create->acls.empty()) + create->acls = default_acls; + requests.push_back(create); + } + else if (const auto * concrete_request_remove = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_remove)); + } + else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_set)); + } + else if (const auto * concrete_request_check = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_check)); + } + else + throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS); + } +} + +void ZooKeeperMultiRequest::writeImpl(WriteBuffer & out) const +{ + for (const auto & request : requests) + { + const auto & zk_request = dynamic_cast(*request); + + bool done = false; + int32_t error = -1; + + Coordination::write(zk_request.getOpNum(), out); + Coordination::write(done, out); + Coordination::write(error, out); + + zk_request.writeImpl(out); + } + + OpNum op_num = -1; + bool done = true; + int32_t error = -1; + + Coordination::write(op_num, out); + Coordination::write(done, out); + Coordination::write(error, out); +} + +void ZooKeeperMultiRequest::readImpl(ReadBuffer & in) +{ + + while (true) + { + OpNum op_num; + bool done; + int32_t error; + Coordination::read(op_num, in); + Coordination::read(done, in); + Coordination::read(error, in); + + if (done) + { + if (op_num != -1) + throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); + if (error != -1) + throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); + break; + } + + ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(op_num); + request->readImpl(in); + requests.push_back(request); + + if (in.eof()) + throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR); + } +} + +void ZooKeeperMultiResponse::readImpl(ReadBuffer & in) +{ + for (auto & response : responses) + { + OpNum op_num; + bool done; + Error op_error; + + Coordination::read(op_num, in); + Coordination::read(done, in); + Coordination::read(op_error, in); + + if (done) + throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR); + + /// op_num == -1 is special for multi transaction. + /// For unknown reason, error code is duplicated in header and in response body. + + if (op_num == -1) + response = std::make_shared(); + + if (op_error != Error::ZOK) + { + response->error = op_error; + + /// Set error for whole transaction. + /// If some operations fail, ZK send global error as zero and then send details about each operation. + /// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations. + if (error == Error::ZOK && op_error != Error::ZRUNTIMEINCONSISTENCY) + error = op_error; + } + + if (op_error == Error::ZOK || op_num == -1) + dynamic_cast(*response).readImpl(in); + } + + /// Footer. + { + OpNum op_num; + bool done; + int32_t error_read; + + Coordination::read(op_num, in); + Coordination::read(done, in); + Coordination::read(error_read, in); + + if (!done) + throw Exception("Too many results received for multi transaction", Error::ZMARSHALLINGERROR); + if (op_num != -1) + throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); + if (error_read != -1) + throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); + } +} + +void ZooKeeperMultiResponse::writeImpl(WriteBuffer & out) const +{ + for (auto & response : responses) + { + const ZooKeeperResponse & zk_response = dynamic_cast(*response); + OpNum op_num = zk_response.getOpNum(); + bool done = false; + Error op_error = zk_response.error; + + Coordination::write(op_num, out); + Coordination::write(done, out); + Coordination::write(op_error, out); + zk_response.writeImpl(out); + } + + /// Footer. + { + OpNum op_num = -1; + bool done = true; + int32_t error_read = - 1; + + Coordination::write(op_num, out); + Coordination::write(done, out); + Coordination::write(error_read, out); + } +} + +ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared(requests); } +ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared(); } + +void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator) +{ + if (!op_num_to_request.try_emplace(op_num, creator).second) + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Request with op num {} already registered", op_num); +} + +std::shared_ptr ZooKeeperRequest::read(ReadBuffer & in) +{ + XID xid; + OpNum op_num; + + Coordination::read(xid, in); + Coordination::read(op_num, in); + + auto request = ZooKeeperRequestFactory::instance().get(op_num); + request->xid = xid; + request->readImpl(in); + return request; +} + +ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const +{ + auto it = op_num_to_request.find(op_num); + if (it == op_num_to_request.end()) + throw Exception("Unknown operation type " + std::to_string(op_num), Error::ZBADARGUMENTS); + + return it->second(); +} + +ZooKeeperRequestFactory & ZooKeeperRequestFactory::instance() +{ + static ZooKeeperRequestFactory factory; + return factory; +} + +template +void registerZooKeeperRequest(ZooKeeperRequestFactory & factory) +{ + factory.registerRequest(num, [] { return std::make_shared(); }); +} + +ZooKeeperRequestFactory::ZooKeeperRequestFactory() +{ + registerZooKeeperRequest<11, ZooKeeperHeartbeatRequest>(*this); + registerZooKeeperRequest<100, ZooKeeperAuthRequest>(*this); + registerZooKeeperRequest<-11, ZooKeeperCloseRequest>(*this); + registerZooKeeperRequest<1, ZooKeeperCreateRequest>(*this); + registerZooKeeperRequest<2, ZooKeeperRemoveRequest>(*this); + registerZooKeeperRequest<3, ZooKeeperExistsRequest>(*this); + registerZooKeeperRequest<4, ZooKeeperGetRequest>(*this); + registerZooKeeperRequest<5, ZooKeeperSetRequest>(*this); + registerZooKeeperRequest<12, ZooKeeperListRequest>(*this); + registerZooKeeperRequest<13, ZooKeeperCheckRequest>(*this); + registerZooKeeperRequest<14, ZooKeeperMultiRequest>(*this); +} + +} diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h new file mode 100644 index 00000000000..0b19869dd5a --- /dev/null +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -0,0 +1,326 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +namespace Coordination +{ + +using XID = int32_t; +using OpNum = int32_t; + + +struct ZooKeeperResponse : virtual Response +{ + virtual ~ZooKeeperResponse() override = default; + virtual void readImpl(ReadBuffer &) = 0; + virtual void writeImpl(WriteBuffer &) const = 0; + virtual OpNum getOpNum() const = 0; +}; + +using ZooKeeperResponsePtr = std::shared_ptr; + +/// Exposed in header file for Yandex.Metrica code. +struct ZooKeeperRequest : virtual Request +{ + XID xid = 0; + bool has_watch = false; + /// If the request was not send and the error happens, we definitely sure, that it has not been processed by the server. + /// If the request was sent and we didn't get the response and the error happens, then we cannot be sure was it processed or not. + bool probably_sent = false; + + ZooKeeperRequest() = default; + ZooKeeperRequest(const ZooKeeperRequest &) = default; + virtual ~ZooKeeperRequest() override = default; + + virtual OpNum getOpNum() const = 0; + + /// Writes length, xid, op_num, then the rest. + void write(WriteBuffer & out) const; + + virtual void writeImpl(WriteBuffer &) const = 0; + virtual void readImpl(ReadBuffer &) = 0; + + static std::shared_ptr read(ReadBuffer & in); + + virtual ZooKeeperResponsePtr makeResponse() const = 0; +}; + +using ZooKeeperRequestPtr = std::shared_ptr; + +struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest +{ + String getPath() const override { return {}; } + OpNum getOpNum() const override { return 11; } + void writeImpl(WriteBuffer &) const override {} + void readImpl(ReadBuffer &) override {} + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + OpNum getOpNum() const override { return 11; } +}; + +struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + + void writeImpl(WriteBuffer & out) const override; + + /// TODO FIXME alesap + OpNum getOpNum() const override { return 0; } +}; + +struct ZooKeeperAuthRequest final : ZooKeeperRequest +{ + int32_t type = 0; /// ignored by the server + String scheme; + String data; + + String getPath() const override { return {}; } + OpNum getOpNum() const override { return 100; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperAuthResponse final : ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + + OpNum getOpNum() const override { return 100; } +}; + +struct ZooKeeperCloseRequest final : ZooKeeperRequest +{ + String getPath() const override { return {}; } + OpNum getOpNum() const override { return -11; } + void writeImpl(WriteBuffer &) const override {} + void readImpl(ReadBuffer &) override {} + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperCloseResponse final : ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override + { + throw Exception("Received response for close request", Error::ZRUNTIMEINCONSISTENCY); + } + + void writeImpl(WriteBuffer &) const override {} + + OpNum getOpNum() const override { return -11; } +}; + +struct ZooKeeperCreateRequest final : CreateRequest, ZooKeeperRequest +{ + ZooKeeperCreateRequest() = default; + explicit ZooKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {} + + OpNum getOpNum() const override { return 1; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + + void writeImpl(WriteBuffer & out) const override; + + OpNum getOpNum() const override { return 1; } +}; + +struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest +{ + ZooKeeperRemoveRequest() = default; + explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} + + OpNum getOpNum() const override { return 2; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + OpNum getOpNum() const override { return 2; } +}; + +struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return 3; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return 3; } +}; + +struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return 4; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return 4; } +}; + +struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest +{ + ZooKeeperSetRequest() = default; + explicit ZooKeeperSetRequest(const SetRequest & base) : SetRequest(base) {} + + OpNum getOpNum() const override { return 5; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return 5; } +}; + +struct ZooKeeperListRequest final : ListRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return 12; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperListResponse final : ListResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return 12; } +}; + +struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest +{ + ZooKeeperCheckRequest() = default; + explicit ZooKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {} + + OpNum getOpNum() const override { return 13; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + OpNum getOpNum() const override { return 13; } +}; + +/// This response may be received only as an element of responses in MultiResponse. +struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + + OpNum getOpNum() const override { return -1; } +}; + +struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return 14; } + ZooKeeperMultiRequest() = default; + + ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls); + + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse +{ + OpNum getOpNum() const override { return 14; } + + explicit ZooKeeperMultiResponse(const Requests & requests) + { + responses.reserve(requests.size()); + + for (const auto & request : requests) + responses.emplace_back(dynamic_cast(*request).makeResponse()); + } + + explicit ZooKeeperMultiResponse(const Responses & responses_) + { + responses = responses_; + } + + void readImpl(ReadBuffer & in) override; + + void writeImpl(WriteBuffer & out) const override; + +}; + +class ZooKeeperRequestFactory final : private boost::noncopyable +{ + +public: + using Creator = std::function; + using OpNumToRequest = std::unordered_map; + + static ZooKeeperRequestFactory & instance(); + + ZooKeeperRequestPtr get(OpNum op_num) const; + + void registerRequest(OpNum op_num, Creator creator); + +private: + OpNumToRequest op_num_to_request; + +private: + ZooKeeperRequestFactory(); +}; + +} diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index f5c57781eef..f3c8b537cf2 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -281,17 +281,6 @@ static void write(int32_t x, WriteBuffer & out) writeBinary(x, out); } -static void write(bool x, WriteBuffer & out) -{ - writeBinary(x, out); -} - -static void write(const String & s, WriteBuffer & out) -{ - write(int32_t(s.size()), out); - out.write(s.data(), s.size()); -} - template void write(std::array s, WriteBuffer & out) { write(int32_t(N), out); @@ -305,14 +294,6 @@ template void write(const std::vector & arr, WriteBuffer & out) write(elem, out); } -static void write(const ACL & acl, WriteBuffer & out) -{ - write(acl.permissions, out); - write(acl.scheme, out); - write(acl.id, out); -} - - static void read(int64_t & x, ReadBuffer & in) { readBinary(x, in); @@ -332,33 +313,6 @@ static void read(Error & x, ReadBuffer & in) x = Error(code); } -static void read(bool & x, ReadBuffer & in) -{ - readBinary(x, in); -} - -static void read(String & s, ReadBuffer & in) -{ - int32_t size = 0; - read(size, in); - - if (size == -1) - { - /// It means that zookeeper node has NULL value. We will treat it like empty string. - s.clear(); - return; - } - - if (size < 0) - throw Exception("Negative size while reading string from ZooKeeper", Error::ZMARSHALLINGERROR); - - if (size > MAX_STRING_OR_ARRAY_SIZE) - throw Exception("Too large string size while reading from ZooKeeper", Error::ZMARSHALLINGERROR); - - s.resize(size); - in.read(s.data(), size); -} - template void read(std::array & s, ReadBuffer & in) { int32_t size = 0; @@ -368,21 +322,6 @@ template void read(std::array & s, ReadBuffer & in) in.read(s.data(), N); } -static void read(Stat & stat, ReadBuffer & in) -{ - read(stat.czxid, in); - read(stat.mzxid, in); - read(stat.ctime, in); - read(stat.mtime, in); - read(stat.version, in); - read(stat.cversion, in); - read(stat.aversion, in); - read(stat.ephemeralOwner, in); - read(stat.dataLength, in); - read(stat.numChildren, in); - read(stat.pzxid, in); -} - template void read(std::vector & arr, ReadBuffer & in) { int32_t size = 0; @@ -396,7 +335,6 @@ template void read(std::vector & arr, ReadBuffer & in) read(elem, in); } - template void ZooKeeper::write(const T & x) { @@ -409,19 +347,6 @@ void ZooKeeper::read(T & x) Coordination::read(x, *in); } - -void ZooKeeperRequest::write(WriteBuffer & out) const -{ - /// Excessive copy to calculate length. - WriteBufferFromOwnString buf; - Coordination::write(xid, buf); - Coordination::write(getOpNum(), buf); - writeImpl(buf); - Coordination::write(buf.str(), out); - out.next(); -} - - static void removeRootPath(String & path, const String & root_path) { if (root_path.empty()) @@ -433,385 +358,6 @@ static void removeRootPath(String & path, const String & root_path) path = path.substr(root_path.size()); } - -struct ZooKeeperResponse : virtual Response -{ - virtual ~ZooKeeperResponse() override = default; - virtual void readImpl(ReadBuffer &) = 0; -}; - - -struct ZooKeeperHeartbeatRequest final : ZooKeeperRequest -{ - String getPath() const override { return {}; } - ZooKeeper::OpNum getOpNum() const override { return 11; } - void writeImpl(WriteBuffer &) const override {} - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse -{ - void readImpl(ReadBuffer &) override {} -}; - -struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(type, in); - Coordination::read(state, in); - Coordination::read(path, in); - } -}; - -struct ZooKeeperAuthRequest final : ZooKeeperRequest -{ - int32_t type = 0; /// ignored by the server - String scheme; - String data; - - String getPath() const override { return {}; } - ZooKeeper::OpNum getOpNum() const override { return 100; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(type, out); - Coordination::write(scheme, out); - Coordination::write(data, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperAuthResponse final : ZooKeeperResponse -{ - void readImpl(ReadBuffer &) override {} -}; - -struct ZooKeeperCloseRequest final : ZooKeeperRequest -{ - String getPath() const override { return {}; } - ZooKeeper::OpNum getOpNum() const override { return -11; } - void writeImpl(WriteBuffer &) const override {} - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperCloseResponse final : ZooKeeperResponse -{ - void readImpl(ReadBuffer &) override - { - throw Exception("Received response for close request", Error::ZRUNTIMEINCONSISTENCY); - } -}; - -struct ZooKeeperCreateRequest final : CreateRequest, ZooKeeperRequest -{ - ZooKeeperCreateRequest() = default; - explicit ZooKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {} - - ZooKeeper::OpNum getOpNum() const override { return 1; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(data, out); - Coordination::write(acls, out); - - int32_t flags = 0; - - if (is_ephemeral) - flags |= 1; - if (is_sequential) - flags |= 2; - - Coordination::write(flags, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(path_created, in); - } -}; - -struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest -{ - ZooKeeperRemoveRequest() = default; - explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} - - ZooKeeper::OpNum getOpNum() const override { return 2; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(version, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer &) override {} -}; - -struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest -{ - ZooKeeper::OpNum getOpNum() const override { return 3; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(has_watch, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(stat, in); - } -}; - -struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest -{ - ZooKeeper::OpNum getOpNum() const override { return 4; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(has_watch, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(data, in); - Coordination::read(stat, in); - } -}; - -struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest -{ - ZooKeeperSetRequest() = default; - explicit ZooKeeperSetRequest(const SetRequest & base) : SetRequest(base) {} - - ZooKeeper::OpNum getOpNum() const override { return 5; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(data, out); - Coordination::write(version, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(stat, in); - } -}; - -struct ZooKeeperListRequest final : ListRequest, ZooKeeperRequest -{ - ZooKeeper::OpNum getOpNum() const override { return 12; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(has_watch, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperListResponse final : ListResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::read(names, in); - Coordination::read(stat, in); - } -}; - -struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest -{ - ZooKeeperCheckRequest() = default; - explicit ZooKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {} - - ZooKeeper::OpNum getOpNum() const override { return 13; } - void writeImpl(WriteBuffer & out) const override - { - Coordination::write(path, out); - Coordination::write(version, out); - } - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer &) override {} -}; - -/// This response may be received only as an element of responses in MultiResponse. -struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse -{ - void readImpl(ReadBuffer & in) override - { - Coordination::Error read_error; - Coordination::read(read_error, in); - - if (read_error != error) - throw Exception(fmt::format("Error code in ErrorResponse ({}) doesn't match error code in header ({})", read_error, error), - Error::ZMARSHALLINGERROR); - } -}; - -struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest -{ - ZooKeeper::OpNum getOpNum() const override { return 14; } - - ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls) - { - /// Convert nested Requests to ZooKeeperRequests. - /// Note that deep copy is required to avoid modifying path in presence of chroot prefix. - requests.reserve(generic_requests.size()); - - for (const auto & generic_request : generic_requests) - { - if (const auto * concrete_request_create = dynamic_cast(generic_request.get())) - { - auto create = std::make_shared(*concrete_request_create); - if (create->acls.empty()) - create->acls = default_acls; - requests.push_back(create); - } - else if (const auto * concrete_request_remove = dynamic_cast(generic_request.get())) - { - requests.push_back(std::make_shared(*concrete_request_remove)); - } - else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) - { - requests.push_back(std::make_shared(*concrete_request_set)); - } - else if (const auto * concrete_request_check = dynamic_cast(generic_request.get())) - { - requests.push_back(std::make_shared(*concrete_request_check)); - } - else - throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS); - } - } - - void writeImpl(WriteBuffer & out) const override - { - for (const auto & request : requests) - { - const auto & zk_request = dynamic_cast(*request); - - bool done = false; - int32_t error = -1; - - Coordination::write(zk_request.getOpNum(), out); - Coordination::write(done, out); - Coordination::write(error, out); - - zk_request.writeImpl(out); - } - - ZooKeeper::OpNum op_num = -1; - bool done = true; - int32_t error = -1; - - Coordination::write(op_num, out); - Coordination::write(done, out); - Coordination::write(error, out); - } - - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse -{ - explicit ZooKeeperMultiResponse(const Requests & requests) - { - responses.reserve(requests.size()); - - for (const auto & request : requests) - responses.emplace_back(dynamic_cast(*request).makeResponse()); - } - - void readImpl(ReadBuffer & in) override - { - for (auto & response : responses) - { - ZooKeeper::OpNum op_num; - bool done; - Error op_error; - - Coordination::read(op_num, in); - Coordination::read(done, in); - Coordination::read(op_error, in); - - if (done) - throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR); - - /// op_num == -1 is special for multi transaction. - /// For unknown reason, error code is duplicated in header and in response body. - - if (op_num == -1) - response = std::make_shared(); - - if (op_error != Error::ZOK) - { - response->error = op_error; - - /// Set error for whole transaction. - /// If some operations fail, ZK send global error as zero and then send details about each operation. - /// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations. - if (error == Error::ZOK && op_error != Error::ZRUNTIMEINCONSISTENCY) - error = op_error; - } - - if (op_error == Error::ZOK || op_num == -1) - dynamic_cast(*response).readImpl(in); - } - - /// Footer. - { - ZooKeeper::OpNum op_num; - bool done; - int32_t error_read; - - Coordination::read(op_num, in); - Coordination::read(done, in); - Coordination::read(error_read, in); - - if (!done) - throw Exception("Too many results received for multi transaction", Error::ZMARSHALLINGERROR); - if (op_num != -1) - throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); - if (error_read != -1) - throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR); - } - } -}; - - -ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperCreateRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared(); } -ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared(requests); } -ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared(); } - - static constexpr int32_t protocol_version = 0; static constexpr ZooKeeper::XID watch_xid = -1; @@ -1688,5 +1234,4 @@ void ZooKeeper::close() ProfileEvents::increment(ProfileEvents::ZooKeeperClose); } - } diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 085b0e9856a..c96d7d2f0cb 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -85,9 +86,6 @@ namespace Coordination using namespace DB; -struct ZooKeeperRequest; - - /** Usage scenario: look at the documentation for IKeeper class. */ class ZooKeeper : public IKeeper @@ -193,7 +191,7 @@ private: struct RequestInfo { - std::shared_ptr request; + ZooKeeperRequestPtr request; ResponseCallback callback; WatchCallback watch; clock::time_point time; @@ -246,31 +244,4 @@ private: CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; }; -struct ZooKeeperResponse; -using ZooKeeperResponsePtr = std::shared_ptr; - -/// Exposed in header file for Yandex.Metrica code. -struct ZooKeeperRequest : virtual Request -{ - ZooKeeper::XID xid = 0; - bool has_watch = false; - /// If the request was not send and the error happens, we definitely sure, that is has not been processed by the server. - /// If the request was sent and we didn't get the response and the error happens, then we cannot be sure was it processed or not. - bool probably_sent = false; - - ZooKeeperRequest() = default; - ZooKeeperRequest(const ZooKeeperRequest &) = default; - virtual ~ZooKeeperRequest() override = default; - - virtual ZooKeeper::OpNum getOpNum() const = 0; - - /// Writes length, xid, op_num, then the rest. - void write(WriteBuffer & out) const; - - virtual void writeImpl(WriteBuffer &) const = 0; - - virtual ZooKeeperResponsePtr makeResponse() const = 0; -}; - - }