From 08170d0d779b12eff05777e12f3d6b086e54ae02 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 3 Apr 2018 21:24:18 +0300 Subject: [PATCH] Modifications after removing libzookeeper [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/KeeperException.h | 54 +------- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 130 +++++++++++++----- dbms/src/Common/ZooKeeper/ZooKeeperImpl.h | 20 +++ dbms/src/Interpreters/DDLWorker.cpp | 4 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 1 + 5 files changed, 121 insertions(+), 88 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/KeeperException.h b/dbms/src/Common/ZooKeeper/KeeperException.h index d310cb8dc1d..9afe8d46873 100644 --- a/dbms/src/Common/ZooKeeper/KeeperException.h +++ b/dbms/src/Common/ZooKeeper/KeeperException.h @@ -1,21 +1,6 @@ #pragma once -#include + #include "Types.h" -#include - - -namespace DB -{ - namespace ErrorCodes - { - extern const int KEEPER_EXCEPTION; - } -} - -namespace ProfileEvents -{ - extern const Event ZooKeeperExceptions; -} namespace zkutil @@ -43,42 +28,7 @@ inline bool isUserError(int32_t zk_return_code) } -class KeeperException : public DB::Exception -{ -private: - /// delegate constructor, used to minimize repetition; last parameter used for overload resolution - KeeperException(const std::string & msg, const int32_t code, int) - : DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); } - -public: - KeeperException(const std::string & msg, const int32_t code) - : KeeperException(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0) {} - explicit KeeperException(const int32_t code) : KeeperException(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0) {} - KeeperException(const int32_t code, const std::string & path) - : KeeperException(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0) {} - - KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); } - - const char * name() const throw() override { return "zkutil::KeeperException"; } - const char * className() const throw() override { return "zkutil::KeeperException"; } - KeeperException * clone() const override { return new KeeperException(*this); } - - /// Any error related with network or master election - /// In case of these errors you should reinitialize ZooKeeper session. - bool isHardwareError() const - { - return zkutil::isHardwareError(code); - } - - const int32_t code; - -private: - static void incrementEventCounter() - { - ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions); - } - -}; +using KeeperException = ZooKeeperImpl::Exception; class KeeperMultiException : public KeeperException diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 3f999a85384..a3dd6bec291 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,9 +1,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -11,7 +13,19 @@ #include -//#include + +namespace DB +{ + namespace ErrorCodes + { + extern const int KEEPER_EXCEPTION; + } +} + +namespace ProfileEvents +{ + extern const Event ZooKeeperExceptions; +} /** ZooKeeper wire protocol. @@ -228,6 +242,33 @@ after: namespace ZooKeeperImpl { +Exception::Exception(const std::string & msg, const int32_t code, int) + : DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) +{ + ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions); +} + +Exception::Exception(const std::string & msg, const int32_t code) + : Exception(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0) +{ +} + +Exception::Exception(const int32_t code) + : Exception(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0) +{ +} + +Exception::Exception(const int32_t code, const std::string & path) + : Exception(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0) +{ +} + +Exception::Exception(const Exception & exc) + : DB::Exception(exc), code(exc.code) +{ +} + + using namespace DB; @@ -304,10 +345,10 @@ void read(String & s, ReadBuffer & in) static constexpr int32_t max_string_size = 1 << 20; int32_t size = 0; read(size, in); - if (size < 0) - throw Exception("Negative size"); /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string. + if (size < 0) /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string. + throw Exception("Negative size while reading string from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR); if (size > max_string_size) - throw Exception("Too large string size"); /// TODO error code + throw Exception("Too large string size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR); s.resize(size); in.read(&s[0], size); } @@ -317,7 +358,7 @@ template void read(std::array & s, ReadBuffer & in) int32_t size = 0; read(size, in); if (size != N) - throw Exception("Unexpected array size"); /// TODO error code + throw Exception("Unexpected array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR); in.read(&s[0], N); } @@ -347,9 +388,9 @@ template void read(std::vector & arr, ReadBuffer & in) int32_t size = 0; read(size, in); if (size < 0) - throw Exception("Negative size"); + throw Exception("Negative size while reading array from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR); if (size > max_array_size) - throw Exception("Too large array size"); /// TODO error code + throw Exception("Too large array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR); arr.resize(size); for (auto & elem : arr) read(elem, in); @@ -488,6 +529,7 @@ void ZooKeeper::connect( static constexpr size_t num_tries = 3; bool connected = false; + WriteBufferFromOwnString fail_reasons; for (size_t try_no = 0; try_no < num_tries; ++try_no) { for (const auto & address : addresses) @@ -500,10 +542,11 @@ void ZooKeeper::connect( } catch (const Poco::Net::NetException & e) { - /// TODO log exception + fail_reasons << "\n" << getCurrentExceptionMessage(false); } catch (const Poco::TimeoutException & e) { + fail_reasons << "\n" << getCurrentExceptionMessage(false); } } @@ -512,7 +555,22 @@ void ZooKeeper::connect( } if (!connected) - throw Exception("All connection tries failed"); /// TODO more info; error code + { + WriteBufferFromOwnString out; + out << "All connection tries failed while connecting to ZooKeeper. Addresses: "; + bool first = true; + for (const auto & address : addresses) + { + if (first) + first = false; + else + out << ", "; + out << address.toString(); + } + + out << fail_reasons.str(); + throw Exception(out.str(), ZCONNECTIONLOSS); + } socket.setReceiveTimeout(operation_timeout); socket.setSendTimeout(operation_timeout); @@ -553,15 +611,15 @@ void ZooKeeper::receiveHandshake() read(handshake_length); if (handshake_length != 36) - throw Exception("Unexpected handshake length received: " + toString(handshake_length)); + throw Exception("Unexpected handshake length received: " + toString(handshake_length), ZMARSHALLINGERROR); read(protocol_version_read); if (protocol_version_read != protocol_version) - throw Exception("Unexpected protocol version: " + toString(protocol_version_read)); + throw Exception("Unexpected protocol version: " + toString(protocol_version_read), ZMARSHALLINGERROR); read(timeout); if (timeout != session_timeout.totalMilliseconds()) - throw Exception("Received different session timeout from server: " + toString(timeout)); + throw Exception("Received different session timeout from server: " + toString(timeout), ZMARSHALLINGERROR); read(session_id); read(passwd); @@ -588,14 +646,17 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data) read(err); if (xid != auth_xid) - throw Exception("Unexpected event recievent in reply to auth request: " + toString(xid)); + throw Exception("Unexpected event recieved in reply to auth request: " + toString(xid), + ZMARSHALLINGERROR); int32_t actual_length = in->count() - count_before_event; if (length != actual_length) - throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length)); + throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), + ZMARSHALLINGERROR); if (err) - throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err))); + throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)), + ZMARSHALLINGERROR); } @@ -669,7 +730,7 @@ void ZooKeeper::receiveThread() has_operations = true; auto earliest_operation_deadline = operations.begin()->second.time + std::chrono::microseconds(operation_timeout.totalMicroseconds()); if (now > earliest_operation_deadline) - throw Exception("Operation timeout"); + throw Exception("Operation timeout", ZOPERATIONTIMEOUT); max_wait = std::chrono::duration_cast(earliest_operation_deadline - now).count(); } } @@ -685,10 +746,10 @@ void ZooKeeper::receiveThread() else { if (has_operations) - throw Exception("Operation timeout"); + throw Exception("Operation timeout", ZOPERATIONTIMEOUT); waited += max_wait; if (waited > session_timeout.totalMicroseconds()) - throw Exception("Nothing is received in session timeout"); + throw Exception("Nothing is received in session timeout", ZOPERATIONTIMEOUT); } } @@ -729,10 +790,10 @@ ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { return st void addRootPath(String & path, const String & root_path) { if (path.empty()) - throw Exception("Path cannot be empty"); + throw Exception("Path cannot be empty", ZooKeeper::ZBADARGUMENTS); if (path[0] != '/') - throw Exception("Path must begin with /"); + throw Exception("Path must begin with /", ZooKeeper::ZBADARGUMENTS); if (root_path.empty()) return; @@ -749,7 +810,7 @@ void removeRootPath(String & path, const String & root_path) return; if (path.size() <= root_path.size()) - throw Exception("Received path is not longer than root_path"); + throw Exception("Received path is not longer than root_path", ZooKeeper::ZDATAINCONSISTENCY); path = path.substr(root_path.size()); } @@ -798,7 +859,7 @@ void ZooKeeper::receiveEvent() if (xid == ping_xid) { if (err) - throw Exception("Received error in heartbeat response: " + String(errorMessage(err))); + throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY); response = std::make_shared(); @@ -845,7 +906,7 @@ void ZooKeeper::receiveEvent() auto it = operations.find(xid); if (it == operations.end()) - throw Exception("Received response for unknown xid"); + throw Exception("Received response for unknown xid", ZRUNTIMEINCONSISTENCY); request_info = std::move(it->second); operations.erase(it); @@ -866,7 +927,7 @@ void ZooKeeper::receiveEvent() int32_t actual_length = in->count() - count_before_event; if (length != actual_length) - throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length)); + throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR); if (request_info.callback) request_info.callback(*response); @@ -1058,12 +1119,13 @@ void ZooKeeper::ErrorResponse::readImpl(ReadBuffer & in) ZooKeeperImpl::read(read_error, in); if (read_error != error) - throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")"); + throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")", + ZMARSHALLINGERROR); } void ZooKeeper::CloseResponse::readImpl(ReadBuffer &) { - throw Exception("Received response for close request"); + throw Exception("Received response for close request", ZRUNTIMEINCONSISTENCY); } ZooKeeper::MultiResponse::MultiResponse(const Requests & requests) @@ -1089,7 +1151,7 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) // std::cerr << "Received result for multi: " << op_num << "\n"; if (done) - throw Exception("Not enough results received for multi transaction"); + throw Exception("Not enough results received for multi transaction", ZMARSHALLINGERROR); /// op_num == -1 is special for multi transaction. /// For unknown reason, error code is duplicated in header and in response body. @@ -1123,11 +1185,11 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) ZooKeeperImpl::read(error, in); if (!done) - throw Exception("Too many results received for multi transaction"); + throw Exception("Too many results received for multi transaction", ZMARSHALLINGERROR); if (op_num != -1) - throw Exception("Unexpected op_num received at the end of results for multi transaction"); + throw Exception("Unexpected op_num received at the end of results for multi transaction", ZMARSHALLINGERROR); if (error != -1) - throw Exception("Unexpected error value received at the end of results for multi transaction"); + throw Exception("Unexpected error value received at the end of results for multi transaction", ZMARSHALLINGERROR); } } @@ -1136,7 +1198,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) { /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. if (expired && info.request->xid != close_xid) - throw Exception("Session expired"); + throw Exception("Session expired", ZSESSIONEXPIRED); info.request->addRootPath(root_path); @@ -1146,7 +1208,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) { info.request->xid = xid.fetch_add(1); if (info.request->xid < 0) - throw Exception("XID overflow"); + throw Exception("XID overflow", ZSESSIONEXPIRED); } { @@ -1161,8 +1223,8 @@ void ZooKeeper::pushRequest(RequestInfo && info) watches[info.request->getPath()].emplace_back(std::move(info.watch)); } - if (!requests.tryPush(info.request, session_timeout.totalMilliseconds())) - throw Exception("Cannot push request to queue within session timeout"); + if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds())) + throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index 512e6a9a421..3303abf9595 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -29,6 +29,26 @@ namespace ZooKeeperImpl using namespace DB; +class Exception : public DB::Exception +{ +private: + /// Delegate constructor, used to minimize repetition; last parameter used for overload resolution. + Exception(const std::string & msg, const int32_t code, int); + +public: + explicit Exception(const int32_t code); + Exception(const std::string & msg, const int32_t code); + Exception(const int32_t code, const std::string & path); + Exception(const Exception & exc); + + const char * name() const throw() override { return "ZooKeeperImpl::Exception"; } + const char * className() const throw() override { return "ZooKeeperImpl::Exception"; } + Exception * clone() const override { return new Exception(*this); } + + const int32_t code; +}; + + /** Usage scenario: * - create an object and issue commands; * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap: diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 72c08e32ba0..5a820ff7334 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -859,7 +859,7 @@ void DDLWorker::run() } catch (const zkutil::KeeperException & e) { - if (!e.isHardwareError()) + if (!zkutil::isHardwareError(e.code)) throw; } } @@ -887,7 +887,7 @@ void DDLWorker::run() } catch (zkutil::KeeperException & e) { - if (e.isHardwareError()) + if (zkutil::isHardwareError(e.code)) { LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false)); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 4a839d9470f..8aca9fe4f2e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -27,6 +27,7 @@ namespace ErrorCodes extern const int READONLY; extern const int UNKNOWN_STATUS_OF_INSERT; extern const int INSERT_WAS_DEDUPLICATED; + extern const int KEEPER_EXCEPTION; }