Avoid errors due to implicit int<->bool conversions when using ZK API

This commit is contained in:
Alexey Milovidov 2020-06-12 18:09:12 +03:00
parent 13ceaa4779
commit 72257061d5
28 changed files with 462 additions and 442 deletions

View File

@ -25,7 +25,7 @@ void ClusterCopier::init()
task_description_watch_callback = [this] (const Coordination::WatchResponse & response) task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
{ {
if (response.error != Coordination::ZOK) if (response.error != Coordination::Error::ZOK)
return; return;
UInt64 version = ++task_description_version; UInt64 version = ++task_description_version;
LOG_DEBUG(log, "Task description should be updated, local version {}", version); LOG_DEBUG(log, "Task description should be updated, local version {}", version);
@ -206,11 +206,11 @@ void ClusterCopier::uploadTaskDescription(const std::string & task_path, const s
zookeeper->createAncestors(local_task_description_path); zookeeper->createAncestors(local_task_description_path);
auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent); auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
if (code && force) if (code != Coordination::Error::ZOK && force)
zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent); zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})", LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})",
((code && !force) ? "not " : ""), local_task_description_path, code, zookeeper->error2string(code)); ((code != Coordination::Error::ZOK && !force) ? "not " : ""), local_task_description_path, code, Coordination::errorMessage(code));
} }
void ClusterCopier::reloadTaskDescription() void ClusterCopier::reloadTaskDescription()
@ -220,10 +220,10 @@ void ClusterCopier::reloadTaskDescription()
String task_config_str; String task_config_str;
Coordination::Stat stat{}; Coordination::Stat stat{};
int code; Coordination::Error code;
zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code); zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
if (code) if (code != Coordination::Error::ZOK)
throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS); throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid); LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid);
@ -376,10 +376,10 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
Coordination::Responses responses; Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses); auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description); return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
if (code == Coordination::ZBADVERSION) if (code == Coordination::Error::ZBADVERSION)
{ {
++num_bad_version_errors; ++num_bad_version_errors;
@ -545,7 +545,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNODEEXISTS) if (e.code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active); LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
return TaskStatus::Active; return TaskStatus::Active;
@ -745,7 +745,7 @@ bool ClusterCopier::tryDropPartitionPiece(
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNODEEXISTS) if (e.code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number)); LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time); std::this_thread::sleep_for(default_sleep_time);
@ -778,7 +778,7 @@ bool ClusterCopier::tryDropPartitionPiece(
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNODEEXISTS) if (e.code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name); LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
return false; return false;
@ -795,7 +795,7 @@ bool ClusterCopier::tryDropPartitionPiece(
/// Remove all status nodes /// Remove all status nodes
{ {
Strings children; Strings children;
if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::ZOK) if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::Error::ZOK)
for (const auto & child : children) for (const auto & child : children)
{ {
zookeeper->removeRecursive(current_shards_path + "/" + child); zookeeper->removeRecursive(current_shards_path + "/" + child);
@ -845,7 +845,7 @@ bool ClusterCopier::tryDropPartitionPiece(
} }
LOG_INFO(log, "Partition {} piece {} was dropped on cluster {}", task_partition.name, toString(current_piece_number), task_table.cluster_push_name); LOG_INFO(log, "Partition {} piece {} was dropped on cluster {}", task_partition.name, toString(current_piece_number), task_table.cluster_push_name);
if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS) if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::Error::ZNODEEXISTS)
zookeeper->set(current_shards_path, host_id); zookeeper->set(current_shards_path, host_id);
} }
@ -1233,7 +1233,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNODEEXISTS) if (e.code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path); LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path);
return TaskStatus::Active; return TaskStatus::Active;
@ -1271,9 +1271,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{ {
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent); auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
if (res == Coordination::ZNODEEXISTS) if (res == Coordination::Error::ZNODEEXISTS)
LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number); LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
if (res == Coordination::ZOK) if (res == Coordination::Error::ZOK)
LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number); LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
return TaskStatus::Finished; return TaskStatus::Finished;
} }
@ -1429,7 +1429,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{ {
Coordination::ExistsResponse status = future_is_dirty_checker.get(); Coordination::ExistsResponse status = future_is_dirty_checker.get();
if (status.error != Coordination::ZNONODE) if (status.error != Coordination::Error::ZNONODE)
{ {
LogicalClock dirt_discovery_epoch (status.stat.mzxid); LogicalClock dirt_discovery_epoch (status.stat.mzxid);
if (dirt_discovery_epoch == clean_state_clock.discovery_zxid) if (dirt_discovery_epoch == clean_state_clock.discovery_zxid)

View File

@ -178,7 +178,7 @@ public:
[stale = stale] (const Coordination::WatchResponse & rsp) [stale = stale] (const Coordination::WatchResponse & rsp)
{ {
auto logger = &Poco::Logger::get("ClusterCopier"); auto logger = &Poco::Logger::get("ClusterCopier");
if (rsp.error == Coordination::ZOK) if (rsp.error == Coordination::Error::ZOK)
{ {
switch (rsp.type) switch (rsp.type)
{ {

View File

@ -23,7 +23,7 @@ namespace ProfileEvents
namespace Coordination namespace Coordination
{ {
Exception::Exception(const std::string & msg, const int32_t code_, int) Exception::Exception(const std::string & msg, const Error code_, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code_) : DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code_)
{ {
if (Coordination::isUserError(code)) if (Coordination::isUserError(code))
@ -34,17 +34,17 @@ Exception::Exception(const std::string & msg, const int32_t code_, int)
ProfileEvents::increment(ProfileEvents::ZooKeeperOtherExceptions); ProfileEvents::increment(ProfileEvents::ZooKeeperOtherExceptions);
} }
Exception::Exception(const std::string & msg, const int32_t code_) Exception::Exception(const std::string & msg, const Error code_)
: Exception(msg + " (" + errorMessage(code_) + ")", code_, 0) : Exception(msg + " (" + errorMessage(code_) + ")", code_, 0)
{ {
} }
Exception::Exception(const int32_t code_) Exception::Exception(const Error code_)
: Exception(errorMessage(code_), code_, 0) : Exception(errorMessage(code_), code_, 0)
{ {
} }
Exception::Exception(const int32_t code_, const std::string & path) Exception::Exception(const Error code_, const std::string & path)
: Exception(std::string{errorMessage(code_)} + ", path: " + path, code_, 0) : Exception(std::string{errorMessage(code_)} + ", path: " + path, code_, 0)
{ {
} }
@ -58,10 +58,10 @@ using namespace DB;
static void addRootPath(String & path, const String & root_path) static void addRootPath(String & path, const String & root_path)
{ {
if (path.empty()) if (path.empty())
throw Exception("Path cannot be empty", ZBADARGUMENTS); throw Exception("Path cannot be empty", Error::ZBADARGUMENTS);
if (path[0] != '/') if (path[0] != '/')
throw Exception("Path must begin with /", ZBADARGUMENTS); throw Exception("Path must begin with /", Error::ZBADARGUMENTS);
if (root_path.empty()) if (root_path.empty())
return; return;
@ -78,64 +78,62 @@ static void removeRootPath(String & path, const String & root_path)
return; return;
if (path.size() <= root_path.size()) if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path", ZDATAINCONSISTENCY); throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
path = path.substr(root_path.size()); path = path.substr(root_path.size());
} }
const char * errorMessage(int32_t code) const char * errorMessage(Error code)
{ {
switch (code) switch (code)
{ {
case ZOK: return "Ok"; case Error::ZOK: return "Ok";
case ZSYSTEMERROR: return "System error"; case Error::ZSYSTEMERROR: return "System error";
case ZRUNTIMEINCONSISTENCY: return "Run time inconsistency"; case Error::ZRUNTIMEINCONSISTENCY: return "Run time inconsistency";
case ZDATAINCONSISTENCY: return "Data inconsistency"; case Error::ZDATAINCONSISTENCY: return "Data inconsistency";
case ZCONNECTIONLOSS: return "Connection loss"; case Error::ZCONNECTIONLOSS: return "Connection loss";
case ZMARSHALLINGERROR: return "Marshalling error"; case Error::ZMARSHALLINGERROR: return "Marshalling error";
case ZUNIMPLEMENTED: return "Unimplemented"; case Error::ZUNIMPLEMENTED: return "Unimplemented";
case ZOPERATIONTIMEOUT: return "Operation timeout"; case Error::ZOPERATIONTIMEOUT: return "Operation timeout";
case ZBADARGUMENTS: return "Bad arguments"; case Error::ZBADARGUMENTS: return "Bad arguments";
case ZINVALIDSTATE: return "Invalid zhandle state"; case Error::ZINVALIDSTATE: return "Invalid zhandle state";
case ZAPIERROR: return "API error"; case Error::ZAPIERROR: return "API error";
case ZNONODE: return "No node"; case Error::ZNONODE: return "No node";
case ZNOAUTH: return "Not authenticated"; case Error::ZNOAUTH: return "Not authenticated";
case ZBADVERSION: return "Bad version"; case Error::ZBADVERSION: return "Bad version";
case ZNOCHILDRENFOREPHEMERALS: return "No children for ephemerals"; case Error::ZNOCHILDRENFOREPHEMERALS: return "No children for ephemerals";
case ZNODEEXISTS: return "Node exists"; case Error::ZNODEEXISTS: return "Node exists";
case ZNOTEMPTY: return "Not empty"; case Error::ZNOTEMPTY: return "Not empty";
case ZSESSIONEXPIRED: return "Session expired"; case Error::ZSESSIONEXPIRED: return "Session expired";
case ZINVALIDCALLBACK: return "Invalid callback"; case Error::ZINVALIDCALLBACK: return "Invalid callback";
case ZINVALIDACL: return "Invalid ACL"; case Error::ZINVALIDACL: return "Invalid ACL";
case ZAUTHFAILED: return "Authentication failed"; case Error::ZAUTHFAILED: return "Authentication failed";
case ZCLOSING: return "ZooKeeper is closing"; case Error::ZCLOSING: return "ZooKeeper is closing";
case ZNOTHING: return "(not error) no server responses to process"; case Error::ZNOTHING: return "(not error) no server responses to process";
case ZSESSIONMOVED: return "Session moved to another server, so operation is ignored"; case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
}
if (code > 0)
return strerror(code);
return "unknown error";
} }
bool isHardwareError(int32_t zk_return_code) __builtin_unreachable();
}
bool isHardwareError(Error zk_return_code)
{ {
return zk_return_code == ZINVALIDSTATE return zk_return_code == Error::ZINVALIDSTATE
|| zk_return_code == ZSESSIONEXPIRED || zk_return_code == Error::ZSESSIONEXPIRED
|| zk_return_code == ZSESSIONMOVED || zk_return_code == Error::ZSESSIONMOVED
|| zk_return_code == ZCONNECTIONLOSS || zk_return_code == Error::ZCONNECTIONLOSS
|| zk_return_code == ZMARSHALLINGERROR || zk_return_code == Error::ZMARSHALLINGERROR
|| zk_return_code == ZOPERATIONTIMEOUT; || zk_return_code == Error::ZOPERATIONTIMEOUT;
} }
bool isUserError(int32_t zk_return_code) bool isUserError(Error zk_return_code)
{ {
return zk_return_code == ZNONODE return zk_return_code == Error::ZNONODE
|| zk_return_code == ZBADVERSION || zk_return_code == Error::ZBADVERSION
|| zk_return_code == ZNOCHILDRENFOREPHEMERALS || zk_return_code == Error::ZNOCHILDRENFOREPHEMERALS
|| zk_return_code == ZNODEEXISTS || zk_return_code == Error::ZNODEEXISTS
|| zk_return_code == ZNOTEMPTY; || zk_return_code == Error::ZNOTEMPTY;
} }

View File

@ -53,6 +53,57 @@ struct Stat
int64_t pzxid; int64_t pzxid;
}; };
enum class Error : int32_t
{
ZOK = 0,
/** System and server-side errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value, but lesser than ZAPIERROR, are system errors.
*/
ZSYSTEMERROR = -1,
ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found
ZDATAINCONSISTENCY = -3, /// A data inconsistency was found
ZCONNECTIONLOSS = -4, /// Connection to the server has been lost
ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invliad zhandle state
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value are API errors.
*/
ZAPIERROR = -100,
ZNONODE = -101, /// Node does not exist
ZNOAUTH = -102, /// Not authenticated
ZBADVERSION = -103, /// Version conflict
ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children
ZNODEEXISTS = -110, /// The node already exists
ZNOTEMPTY = -111, /// The node has children
ZSESSIONEXPIRED = -112, /// The session has been expired by the server
ZINVALIDCALLBACK = -113, /// Invalid callback specified
ZINVALIDACL = -114, /// Invalid ACL specified
ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
bool isHardwareError(Error code);
/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
bool isUserError(Error code);
const char * errorMessage(Error code);
struct Request; struct Request;
using RequestPtr = std::shared_ptr<Request>; using RequestPtr = std::shared_ptr<Request>;
using Requests = std::vector<RequestPtr>; using Requests = std::vector<RequestPtr>;
@ -74,7 +125,7 @@ using ResponseCallback = std::function<void(const Response &)>;
struct Response struct Response
{ {
int32_t error = 0; Error error = Error::ZOK;
Response() = default; Response() = default;
Response(const Response &) = default; Response(const Response &) = default;
Response & operator=(const Response &) = default; Response & operator=(const Response &) = default;
@ -225,56 +276,6 @@ using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>; using MultiCallback = std::function<void(const MultiResponse &)>;
enum Error
{
ZOK = 0,
/** System and server-side errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value, but lesser than ZAPIERROR, are system errors.
*/
ZSYSTEMERROR = -1,
ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found
ZDATAINCONSISTENCY = -3, /// A data inconsistency was found
ZCONNECTIONLOSS = -4, /// Connection to the server has been lost
ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invliad zhandle state
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value are API errors.
*/
ZAPIERROR = -100,
ZNONODE = -101, /// Node does not exist
ZNOAUTH = -102, /// Not authenticated
ZBADVERSION = -103, /// Version conflict
ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children
ZNODEEXISTS = -110, /// The node already exists
ZNOTEMPTY = -111, /// The node has children
ZSESSIONEXPIRED = -112, /// The session has been expired by the server
ZINVALIDCALLBACK = -113, /// Invalid callback specified
ZINVALIDACL = -114, /// Invalid ACL specified
ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
bool isHardwareError(int32_t code);
/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
bool isUserError(int32_t code);
const char * errorMessage(int32_t code);
/// For watches. /// For watches.
enum State enum State
{ {
@ -301,19 +302,19 @@ class Exception : public DB::Exception
{ {
private: private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution. /// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const int32_t code_, int); Exception(const std::string & msg, const Error code_, int);
public: public:
explicit Exception(const int32_t code_); explicit Exception(const Error code_);
Exception(const std::string & msg, const int32_t code_); Exception(const std::string & msg, const Error code_);
Exception(const int32_t code_, const std::string & path); Exception(const Error code_, const std::string & path);
Exception(const Exception & exc); Exception(const Exception & exc);
const char * name() const throw() override { return "Coordination::Exception"; } const char * name() const throw() override { return "Coordination::Exception"; }
const char * className() const throw() override { return "Coordination::Exception"; } const char * className() const throw() override { return "Coordination::Exception"; }
Exception * clone() const override { return new Exception(*this); } Exception * clone() const override { return new Exception(*this); }
const int32_t code; const Error code;
}; };

View File

@ -29,11 +29,11 @@ public:
if (zookeeper->tryGet(path, result_str, &stat)) if (zookeeper->tryGet(path, result_str, &stat))
{ {
result = std::stol(result_str) + 1; result = std::stol(result_str) + 1;
success = zookeeper->trySet(path, std::to_string(result), stat.version) == Coordination::ZOK; success = zookeeper->trySet(path, std::to_string(result), stat.version) == Coordination::Error::ZOK;
} }
else else
{ {
success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == Coordination::ZOK; success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == Coordination::Error::ZOK;
} }
} }
while (!success); while (!success);

View File

@ -21,12 +21,12 @@ public:
/// If it is user error throws KeeperMultiException else throws ordinary KeeperException /// If it is user error throws KeeperMultiException else throws ordinary KeeperException
/// If it is ZOK does nothing /// If it is ZOK does nothing
static void check(int32_t code, const Coordination::Requests & requests, const Coordination::Responses & responses); static void check(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
KeeperMultiException(int32_t code, const Coordination::Requests & requests, const Coordination::Responses & responses); KeeperMultiException(Coordination::Error code, const Coordination::Requests & requests, const Coordination::Responses & responses);
private: private:
static size_t getFailedOpIndex(int32_t code, const Coordination::Responses & responses); static size_t getFailedOpIndex(Coordination::Error code, const Coordination::Responses & responses);
}; };
} }

View File

@ -121,7 +121,7 @@ private:
{ {
DB::tryLogCurrentException(log); DB::tryLogCurrentException(log);
if (e.code == Coordination::ZSESSIONEXPIRED) if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return; return;
} }
catch (...) catch (...)

View File

@ -16,13 +16,13 @@ bool Lock::tryLock()
else else
{ {
std::string dummy; std::string dummy;
int32_t code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy); Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy);
if (code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
locked.reset(); locked.reset();
} }
else if (code == Coordination::ZOK) else if (code == Coordination::Error::ZOK)
{ {
locked = std::make_unique<ZooKeeperHandler>(zookeeper); locked = std::make_unique<ZooKeeperHandler>(zookeeper);
} }

View File

@ -158,7 +158,7 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check)); requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
} }
else else
throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS); throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
} }
} }
@ -338,7 +338,7 @@ ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, in
{ {
auto path_prefix = path; auto path_prefix = path;
if (path_prefix.empty()) if (path_prefix.empty())
throw Exception("Logical error: path cannot be empty", ZSESSIONEXPIRED); throw Exception("Logical error: path cannot be empty", Error::ZSESSIONEXPIRED);
if (path_prefix.back() != '/') if (path_prefix.back() != '/')
path_prefix += '/'; path_prefix += '/';
@ -514,7 +514,7 @@ void TestKeeper::finalize()
WatchResponse response; WatchResponse response;
response.type = SESSION; response.type = SESSION;
response.state = EXPIRED_SESSION; response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED; response.error = Error::ZSESSIONEXPIRED;
for (auto & callback : path_watch.second) for (auto & callback : path_watch.second)
{ {
@ -541,7 +541,7 @@ void TestKeeper::finalize()
if (info.callback) if (info.callback)
{ {
ResponsePtr response = info.request->createResponse(); ResponsePtr response = info.request->createResponse();
response->error = ZSESSIONEXPIRED; response->error = Error::ZSESSIONEXPIRED;
try try
{ {
info.callback(*response); info.callback(*response);
@ -556,7 +556,7 @@ void TestKeeper::finalize()
WatchResponse response; WatchResponse response;
response.type = SESSION; response.type = SESSION;
response.state = EXPIRED_SESSION; response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED; response.error = Error::ZSESSIONEXPIRED;
try try
{ {
info.watch(response); info.watch(response);
@ -587,10 +587,10 @@ void TestKeeper::pushRequest(RequestInfo && request)
std::lock_guard lock(push_request_mutex); std::lock_guard lock(push_request_mutex);
if (expired) if (expired)
throw Exception("Session expired", ZSESSIONEXPIRED); throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(request), operation_timeout.totalMilliseconds())) if (!requests_queue.tryPush(std::move(request), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
} }
catch (...) catch (...)
{ {

View File

@ -38,9 +38,9 @@ const int CreateMode::PersistentSequential = 2;
const int CreateMode::EphemeralSequential = 3; const int CreateMode::EphemeralSequential = 3;
static void check(int32_t code, const std::string & path) static void check(Coordination::Error code, const std::string & path)
{ {
if (code) if (code != Coordination::Error::ZOK)
throw KeeperException(code, path); throw KeeperException(code, path);
} }
@ -59,7 +59,7 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
if (implementation == "zookeeper") if (implementation == "zookeeper")
{ {
if (hosts.empty()) if (hosts.empty())
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS); throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::Error::ZBADARGUMENTS);
std::vector<std::string> hosts_strings; std::vector<std::string> hosts_strings;
splitInto<','>(hosts_strings, hosts); splitInto<','>(hosts_strings, hosts);
@ -84,7 +84,7 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
} }
if (nodes.empty()) if (nodes.empty())
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::ZBADARGUMENTS); throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
impl = std::make_unique<Coordination::ZooKeeper>( impl = std::make_unique<Coordination::ZooKeeper>(
nodes, nodes,
@ -112,7 +112,7 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
} }
if (!chroot.empty() && !exists("/")) if (!chroot.empty() && !exists("/"))
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::ZNONODE); throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
} }
ZooKeeper::ZooKeeper(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_, ZooKeeper::ZooKeeper(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
@ -164,7 +164,7 @@ struct ZooKeeperArgs
implementation = config.getString(config_name + "." + key); implementation = config.getString(config_name + "." + key);
} }
else else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::ZBADARGUMENTS); throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
} }
/// Shuffle the hosts to distribute the load among ZooKeeper nodes. /// Shuffle the hosts to distribute the load among ZooKeeper nodes.
@ -182,7 +182,7 @@ struct ZooKeeperArgs
if (!chroot.empty()) if (!chroot.empty())
{ {
if (chroot.front() != '/') if (chroot.front() != '/')
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::ZBADARGUMENTS); throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::Error::ZBADARGUMENTS);
if (chroot.back() == '/') if (chroot.back() == '/')
chroot.pop_back(); chroot.pop_back();
} }
@ -211,17 +211,17 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
} }
int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res, Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::Stat * stat,
Coordination::WatchCallback watch_callback) Coordination::WatchCallback watch_callback)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::ListResponse & response) auto callback = [&](const Coordination::ListResponse & response)
{ {
code = response.error; code = response.error;
if (!code) if (code == Coordination::Error::ZOK)
{ {
res = response.names; res = response.names;
if (stat) if (stat)
@ -251,37 +251,37 @@ Strings ZooKeeper::getChildrenWatch(
return res; return res;
} }
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat, const EventPtr & watch) Coordination::Stat * stat, const EventPtr & watch)
{ {
int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch)); Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path); throw KeeperException(code, path);
return code; return code;
} }
int32_t ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res, Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::WatchCallback watch_callback) Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
int32_t code = getChildrenImpl(path, res, stat, watch_callback); Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback);
if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path); throw KeeperException(code, path);
return code; return code;
} }
int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::CreateResponse & response) auto callback = [&](const Coordination::CreateResponse & response)
{ {
code = response.error; code = response.error;
if (!code) if (code == Coordination::Error::ZOK)
path_created = response.path_created; path_created = response.path_created;
event.set(); event.set();
}; };
@ -298,20 +298,20 @@ std::string ZooKeeper::create(const std::string & path, const std::string & data
return path_created; return path_created;
} }
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) Coordination::Error ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{ {
int32_t code = createImpl(path, data, mode, path_created); Coordination::Error code = createImpl(path, data, mode, path_created);
if (!(code == Coordination::ZOK || if (!(code == Coordination::Error::ZOK ||
code == Coordination::ZNONODE || code == Coordination::Error::ZNONODE ||
code == Coordination::ZNODEEXISTS || code == Coordination::Error::ZNODEEXISTS ||
code == Coordination::ZNOCHILDRENFOREPHEMERALS)) code == Coordination::Error::ZNOCHILDRENFOREPHEMERALS))
throw KeeperException(code, path); throw KeeperException(code, path);
return code; return code;
} }
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode) Coordination::Error ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode)
{ {
std::string path_created; std::string path_created;
return tryCreate(path, data, mode, path_created); return tryCreate(path, data, mode, path_created);
@ -320,9 +320,9 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data,
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data) void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
{ {
std::string path_created; std::string path_created;
int32_t code = createImpl(path, data, CreateMode::Persistent, path_created); Coordination::Error code = createImpl(path, data, CreateMode::Persistent, path_created);
if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return; return;
else else
throw KeeperException(code, path); throw KeeperException(code, path);
@ -341,14 +341,14 @@ void ZooKeeper::createAncestors(const std::string & path)
} }
} }
int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version) Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::RemoveResponse & response) auto callback = [&](const Coordination::RemoveResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
code = response.error; code = response.error;
event.set(); event.set();
}; };
@ -363,26 +363,26 @@ void ZooKeeper::remove(const std::string & path, int32_t version)
check(tryRemove(path, version), path); check(tryRemove(path, version), path);
} }
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version) Coordination::Error ZooKeeper::tryRemove(const std::string & path, int32_t version)
{ {
int32_t code = removeImpl(path, version); Coordination::Error code = removeImpl(path, version);
if (!(code == Coordination::ZOK || if (!(code == Coordination::Error::ZOK ||
code == Coordination::ZNONODE || code == Coordination::Error::ZNONODE ||
code == Coordination::ZBADVERSION || code == Coordination::Error::ZBADVERSION ||
code == Coordination::ZNOTEMPTY)) code == Coordination::Error::ZNOTEMPTY))
throw KeeperException(code, path); throw KeeperException(code, path);
return code; return code;
} }
int32_t ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::ExistsResponse & response) auto callback = [&](const Coordination::ExistsResponse & response)
{ {
code = response.error; code = response.error;
if (!code && stat) if (code == Coordination::Error::ZOK && stat)
*stat = response.stat; *stat = response.stat;
event.set(); event.set();
}; };
@ -399,22 +399,22 @@ bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, cons
bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
int32_t code = existsImpl(path, stat, watch_callback); Coordination::Error code = existsImpl(path, stat, watch_callback);
if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path); throw KeeperException(code, path);
return code != Coordination::ZNONODE; return code != Coordination::Error::ZNONODE;
} }
int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::GetResponse & response) auto callback = [&](const Coordination::GetResponse & response)
{ {
code = response.error; code = response.error;
if (!code) if (code == Coordination::Error::ZOK)
{ {
res = response.data; res = response.data;
if (stat) if (stat)
@ -431,7 +431,7 @@ int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Coordina
std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
std::string res; std::string res;
if (tryGet(path, res, stat, watch, &code)) if (tryGet(path, res, stat, watch, &code))
return res; return res;
@ -441,7 +441,7 @@ std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat,
std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
std::string res; std::string res;
if (tryGetWatch(path, res, stat, watch_callback, &code)) if (tryGetWatch(path, res, stat, watch_callback, &code))
return res; return res;
@ -449,34 +449,44 @@ std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * s
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code); throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
} }
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Coordination::Stat * stat, const EventPtr & watch, int * return_code) bool ZooKeeper::tryGet(
const std::string & path,
std::string & res,
Coordination::Stat * stat,
const EventPtr & watch,
Coordination::Error * return_code)
{ {
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
} }
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * return_code) bool ZooKeeper::tryGetWatch(
const std::string & path,
std::string & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::Error * return_code)
{ {
int32_t code = getImpl(path, res, stat, watch_callback); Coordination::Error code = getImpl(path, res, stat, watch_callback);
if (!(code == Coordination::ZOK || code == Coordination::ZNONODE)) if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path); throw KeeperException(code, path);
if (return_code) if (return_code)
*return_code = code; *return_code = code;
return code == Coordination::ZOK; return code == Coordination::Error::ZOK;
} }
int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data, Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::string & data,
int32_t version, Coordination::Stat * stat) int32_t version, Coordination::Stat * stat)
{ {
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::SetResponse & response) auto callback = [&](const Coordination::SetResponse & response)
{ {
code = response.error; code = response.error;
if (!code && stat) if (code == Coordination::Error::ZOK && stat)
*stat = response.stat; *stat = response.stat;
event.set(); event.set();
}; };
@ -493,34 +503,34 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t
void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode) void ZooKeeper::createOrUpdate(const std::string & path, const std::string & data, int32_t mode)
{ {
int32_t code = trySet(path, data, -1); Coordination::Error code = trySet(path, data, -1);
if (code == Coordination::ZNONODE) if (code == Coordination::Error::ZNONODE)
{ {
create(path, data, mode); create(path, data, mode);
} }
else if (code != Coordination::ZOK) else if (code != Coordination::Error::ZOK)
throw KeeperException(code, path); throw KeeperException(code, path);
} }
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data, Coordination::Error ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Coordination::Stat * stat) int32_t version, Coordination::Stat * stat)
{ {
int32_t code = setImpl(path, data, version, stat); Coordination::Error code = setImpl(path, data, version, stat);
if (!(code == Coordination::ZOK || if (!(code == Coordination::Error::ZOK ||
code == Coordination::ZNONODE || code == Coordination::Error::ZNONODE ||
code == Coordination::ZBADVERSION)) code == Coordination::Error::ZBADVERSION))
throw KeeperException(code, path); throw KeeperException(code, path);
return code; return code;
} }
int32_t ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses) Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses)
{ {
if (requests.empty()) if (requests.empty())
return Coordination::ZOK; return Coordination::Error::ZOK;
int32_t code = 0; Coordination::Error code = Coordination::Error::ZOK;
Poco::Event event; Poco::Event event;
auto callback = [&](const Coordination::MultiResponse & response) auto callback = [&](const Coordination::MultiResponse & response)
@ -538,15 +548,15 @@ int32_t ZooKeeper::multiImpl(const Coordination::Requests & requests, Coordinati
Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests) Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests)
{ {
Coordination::Responses responses; Coordination::Responses responses;
int32_t code = multiImpl(requests, responses); Coordination::Error code = multiImpl(requests, responses);
KeeperMultiException::check(code, requests, responses); KeeperMultiException::check(code, requests, responses);
return responses; return responses;
} }
int32_t ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses) Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses)
{ {
int32_t code = multiImpl(requests, responses); Coordination::Error code = multiImpl(requests, responses);
if (code && !Coordination::isUserError(code)) if (code != Coordination::Error::ZOK && !Coordination::isUserError(code))
throw KeeperException(code); throw KeeperException(code);
return code; return code;
} }
@ -587,7 +597,7 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path)
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path) void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
{ {
Strings children; Strings children;
if (tryGetChildren(path, children) != Coordination::ZOK) if (tryGetChildren(path, children) != Coordination::Error::ZOK)
return; return;
while (!children.empty()) while (!children.empty())
{ {
@ -609,7 +619,7 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
/// this means someone is concurrently removing these children and we will have /// this means someone is concurrently removing these children and we will have
/// to remove them one by one. /// to remove them one by one.
Coordination::Responses responses; Coordination::Responses responses;
if (tryMulti(ops, responses) != Coordination::ZOK) if (tryMulti(ops, responses) != Coordination::Error::ZOK)
for (const std::string & child : batch) for (const std::string & child : batch)
tryRemove(child); tryRemove(child);
} }
@ -645,7 +655,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
auto callback = [state](const Coordination::ExistsResponse & response) auto callback = [state](const Coordination::ExistsResponse & response)
{ {
state->code = response.error; state->code = int32_t(response.error);
if (state->code) if (state->code)
state->event.set(); state->event.set();
}; };
@ -654,7 +664,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
{ {
if (!state->code) if (!state->code)
{ {
state->code = response.error; state->code = int32_t(response.error);
if (!state->code) if (!state->code)
state->event_type = response.type; state->event_type = response.type;
state->event.set(); state->event.set();
@ -670,11 +680,11 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
else if (!state->event.tryWait(1000)) else if (!state->event.tryWait(1000))
continue; continue;
if (state->code == Coordination::ZNONODE) if (state->code == int32_t(Coordination::Error::ZNONODE))
return true; return true;
if (state->code) if (state->code)
throw KeeperException(state->code, path); throw KeeperException(static_cast<Coordination::Error>(state->code.load(std::memory_order_seq_cst)), path);
if (state->event_type == Coordination::DELETED) if (state->event_type == Coordination::DELETED)
return true; return true;
@ -688,11 +698,6 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
} }
std::string ZooKeeper::error2string(int32_t code)
{
return Coordination::errorMessage(code);
}
bool ZooKeeper::expired() bool ZooKeeper::expired()
{ {
return impl->isExpired(); return impl->isExpired();
@ -712,7 +717,7 @@ std::future<Coordination::CreateResponse> ZooKeeper::asyncCreate(const std::stri
auto callback = [promise, path](const Coordination::CreateResponse & response) mutable auto callback = [promise, path](const Coordination::CreateResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -730,7 +735,7 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & p
auto callback = [promise, path](const Coordination::GetResponse & response) mutable auto callback = [promise, path](const Coordination::GetResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -748,7 +753,7 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGet(const std::string
auto callback = [promise, path](const Coordination::GetResponse & response) mutable auto callback = [promise, path](const Coordination::GetResponse & response) mutable
{ {
if (response.error && response.error != Coordination::ZNONODE) if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -765,7 +770,7 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::stri
auto callback = [promise, path](const Coordination::ExistsResponse & response) mutable auto callback = [promise, path](const Coordination::ExistsResponse & response) mutable
{ {
if (response.error && response.error != Coordination::ZNONODE) if (response.error != Coordination::Error::ZOK && response.error != Coordination::Error::ZNONODE)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -782,7 +787,7 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncSet(const std::string & p
auto callback = [promise, path](const Coordination::SetResponse & response) mutable auto callback = [promise, path](const Coordination::SetResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -799,7 +804,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
auto callback = [promise, path](const Coordination::ListResponse & response) mutable auto callback = [promise, path](const Coordination::ListResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -816,7 +821,7 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncRemove(const std::stri
auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -833,8 +838,13 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::s
auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable auto callback = [promise, path](const Coordination::RemoveResponse & response) mutable
{ {
if (response.error && response.error != Coordination::ZNONODE && response.error != Coordination::ZBADVERSION && response.error != Coordination::ZNOTEMPTY) if (response.error != Coordination::Error::ZOK
&& response.error != Coordination::Error::ZNONODE
&& response.error != Coordination::Error::ZBADVERSION
&& response.error != Coordination::Error::ZNOTEMPTY)
{
promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(path, response.error)));
}
else else
promise->set_value(response); promise->set_value(response);
}; };
@ -864,7 +874,7 @@ std::future<Coordination::MultiResponse> ZooKeeper::asyncMulti(const Coordinatio
auto callback = [promise](const Coordination::MultiResponse & response) mutable auto callback = [promise](const Coordination::MultiResponse & response) mutable
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(response.error))); promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
else else
promise->set_value(response); promise->set_value(response);
@ -874,7 +884,7 @@ std::future<Coordination::MultiResponse> ZooKeeper::asyncMulti(const Coordinatio
return future; return future;
} }
int32_t ZooKeeper::tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses) Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses)
{ {
try try
{ {
@ -887,24 +897,24 @@ int32_t ZooKeeper::tryMultiNoThrow(const Coordination::Requests & requests, Coor
} }
size_t KeeperMultiException::getFailedOpIndex(int32_t exception_code, const Coordination::Responses & responses) size_t KeeperMultiException::getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
{ {
if (responses.empty()) if (responses.empty())
throw DB::Exception("Responses for multi transaction is empty", DB::ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Responses for multi transaction is empty", DB::ErrorCodes::LOGICAL_ERROR);
for (size_t index = 0, size = responses.size(); index < size; ++index) for (size_t index = 0, size = responses.size(); index < size; ++index)
if (responses[index]->error) if (responses[index]->error != Coordination::Error::ZOK)
return index; return index;
if (!Coordination::isUserError(exception_code)) if (!Coordination::isUserError(exception_code))
throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(exception_code) + "' is not valid response code for that", throw DB::Exception("There are no failed OPs because '" + std::string(Coordination::errorMessage(exception_code)) + "' is not valid response code for that",
DB::ErrorCodes::LOGICAL_ERROR); DB::ErrorCodes::LOGICAL_ERROR);
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR); throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
} }
KeeperMultiException::KeeperMultiException(int32_t exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_) KeeperMultiException::KeeperMultiException(Coordination::Error exception_code, const Coordination::Requests & requests_, const Coordination::Responses & responses_)
: KeeperException("Transaction failed", exception_code), : KeeperException("Transaction failed", exception_code),
requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses)) requests(requests_), responses(responses_), failed_op_index(getFailedOpIndex(exception_code, responses))
{ {
@ -917,9 +927,10 @@ std::string KeeperMultiException::getPathForFirstFailedOp() const
return requests[failed_op_index]->getPath(); return requests[failed_op_index]->getPath();
} }
void KeeperMultiException::check(int32_t exception_code, const Coordination::Requests & requests, const Coordination::Responses & responses) void KeeperMultiException::check(
Coordination::Error exception_code, const Coordination::Requests & requests, const Coordination::Responses & responses)
{ {
if (!exception_code) if (exception_code == Coordination::Error::ZOK)
return; return;
if (Coordination::isUserError(exception_code)) if (Coordination::isUserError(exception_code))

View File

@ -99,8 +99,8 @@ public:
/// * The parent is ephemeral. /// * The parent is ephemeral.
/// * The node already exists. /// * The node already exists.
/// In case of other errors throws an exception. /// In case of other errors throws an exception.
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); Coordination::Error tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode); Coordination::Error tryCreate(const std::string & path, const std::string & data, int32_t mode);
/// Create a Persistent node. /// Create a Persistent node.
/// Does nothing if the node already exists. /// Does nothing if the node already exists.
@ -117,7 +117,7 @@ public:
/// * The node doesn't exist /// * The node doesn't exist
/// * Versions don't match /// * Versions don't match
/// * The node has children. /// * The node has children.
int32_t tryRemove(const std::string & path, int32_t version = -1); Coordination::Error tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr); bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
@ -127,9 +127,11 @@ public:
/// Doesn't not throw in the following cases: /// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case. /// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr); bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr,
Coordination::Error * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * code = nullptr); bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback,
Coordination::Error * code = nullptr);
void set(const std::string & path, const std::string & data, void set(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr); int32_t version = -1, Coordination::Stat * stat = nullptr);
@ -140,7 +142,7 @@ public:
/// Doesn't not throw in the following cases: /// Doesn't not throw in the following cases:
/// * The node doesn't exist. /// * The node doesn't exist.
/// * Versions do not match. /// * Versions do not match.
int32_t trySet(const std::string & path, const std::string & data, Coordination::Error trySet(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr); int32_t version = -1, Coordination::Stat * stat = nullptr);
Strings getChildren(const std::string & path, Strings getChildren(const std::string & path,
@ -153,11 +155,11 @@ public:
/// Doesn't not throw in the following cases: /// Doesn't not throw in the following cases:
/// * The node doesn't exist. /// * The node doesn't exist.
int32_t tryGetChildren(const std::string & path, Strings & res, Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr, Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr); const EventPtr & watch = nullptr);
int32_t tryGetChildrenWatch(const std::string & path, Strings & res, Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::Stat * stat,
Coordination::WatchCallback watch_callback); Coordination::WatchCallback watch_callback);
@ -166,9 +168,9 @@ public:
Coordination::Responses multi(const Coordination::Requests & requests); Coordination::Responses multi(const Coordination::Requests & requests);
/// Throws only if some operation has returned an "unexpected" error /// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw. /// - an error that would cause the corresponding try- method to throw.
int32_t tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
/// Throws nothing (even session expired errors) /// Throws nothing (even session expired errors)
int32_t tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
Int64 getClientID(); Int64 getClientID();
@ -238,8 +240,6 @@ public:
/// Like the previous one but don't throw any exceptions on future.get() /// Like the previous one but don't throw any exceptions on future.get()
FutureMulti tryAsyncMulti(const Coordination::Requests & ops); FutureMulti tryAsyncMulti(const Coordination::Requests & ops);
static std::string error2string(int32_t code);
private: private:
friend class EphemeralNodeHolder; friend class EphemeralNodeHolder;
@ -250,13 +250,15 @@ private:
void tryRemoveChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path);
/// The following methods don't throw exceptions but return error codes. /// The following methods don't throw exceptions but return error codes.
int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
int32_t removeImpl(const std::string & path, int32_t version); Coordination::Error removeImpl(const std::string & path, int32_t version);
int32_t getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); Coordination::Error getImpl(
int32_t setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat); const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
int32_t getChildrenImpl(const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
int32_t multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error getChildrenImpl(
int32_t existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback); const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
std::unique_ptr<Coordination::IKeeper> impl; std::unique_ptr<Coordination::IKeeper> impl;

View File

@ -335,6 +335,13 @@ static void read(int32_t & x, ReadBuffer & in)
x = __builtin_bswap32(x); x = __builtin_bswap32(x);
} }
static void read(Error & x, ReadBuffer & in)
{
int32_t code;
readBinary(code, in);
x = Error(code);
}
static void read(bool & x, ReadBuffer & in) static void read(bool & x, ReadBuffer & in)
{ {
readBinary(x, in); readBinary(x, in);
@ -353,10 +360,10 @@ static void read(String & s, ReadBuffer & in)
} }
if (size < 0) if (size < 0)
throw Exception("Negative size while reading string from ZooKeeper", ZMARSHALLINGERROR); throw Exception("Negative size while reading string from ZooKeeper", Error::ZMARSHALLINGERROR);
if (size > MAX_STRING_OR_ARRAY_SIZE) if (size > MAX_STRING_OR_ARRAY_SIZE)
throw Exception("Too large string size while reading from ZooKeeper", ZMARSHALLINGERROR); throw Exception("Too large string size while reading from ZooKeeper", Error::ZMARSHALLINGERROR);
s.resize(size); s.resize(size);
in.read(s.data(), size); in.read(s.data(), size);
@ -367,7 +374,7 @@ template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in)
int32_t size = 0; int32_t size = 0;
read(size, in); read(size, in);
if (size != N) if (size != N)
throw Exception("Unexpected array size while reading from ZooKeeper", ZMARSHALLINGERROR); throw Exception("Unexpected array size while reading from ZooKeeper", Error::ZMARSHALLINGERROR);
in.read(s.data(), N); in.read(s.data(), N);
} }
@ -391,9 +398,9 @@ template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
int32_t size = 0; int32_t size = 0;
read(size, in); read(size, in);
if (size < 0) if (size < 0)
throw Exception("Negative size while reading array from ZooKeeper", ZMARSHALLINGERROR); throw Exception("Negative size while reading array from ZooKeeper", Error::ZMARSHALLINGERROR);
if (size > MAX_STRING_OR_ARRAY_SIZE) if (size > MAX_STRING_OR_ARRAY_SIZE)
throw Exception("Too large array size while reading from ZooKeeper", ZMARSHALLINGERROR); throw Exception("Too large array size while reading from ZooKeeper", Error::ZMARSHALLINGERROR);
arr.resize(size); arr.resize(size);
for (auto & elem : arr) for (auto & elem : arr)
read(elem, in); read(elem, in);
@ -489,7 +496,7 @@ struct ZooKeeperCloseResponse final : ZooKeeperResponse
{ {
void readImpl(ReadBuffer &) override void readImpl(ReadBuffer &) override
{ {
throw Exception("Received response for close request", ZRUNTIMEINCONSISTENCY); throw Exception("Received response for close request", Error::ZRUNTIMEINCONSISTENCY);
} }
}; };
@ -650,12 +657,12 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
{ {
void readImpl(ReadBuffer & in) override void readImpl(ReadBuffer & in) override
{ {
int32_t read_error; Coordination::Error read_error;
Coordination::read(read_error, in); Coordination::read(read_error, in);
if (read_error != error) if (read_error != error)
throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")", throw Exception(fmt::format("Error code in ErrorResponse ({}) doesn't match error code in header ({})", read_error, error),
ZMARSHALLINGERROR); Error::ZMARSHALLINGERROR);
} }
}; };
@ -691,7 +698,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
requests.push_back(std::make_shared<ZooKeeperCheckRequest>(*concrete_request_check)); requests.push_back(std::make_shared<ZooKeeperCheckRequest>(*concrete_request_check));
} }
else else
throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS); throw Exception("Illegal command as part of multi ZooKeeper request", Error::ZBADARGUMENTS);
} }
} }
@ -739,14 +746,14 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
{ {
ZooKeeper::OpNum op_num; ZooKeeper::OpNum op_num;
bool done; bool done;
int32_t op_error; Error op_error;
Coordination::read(op_num, in); Coordination::read(op_num, in);
Coordination::read(done, in); Coordination::read(done, in);
Coordination::read(op_error, in); Coordination::read(op_error, in);
if (done) if (done)
throw Exception("Not enough results received for multi transaction", ZMARSHALLINGERROR); throw Exception("Not enough results received for multi transaction", Error::ZMARSHALLINGERROR);
/// op_num == -1 is special for multi transaction. /// op_num == -1 is special for multi transaction.
/// For unknown reason, error code is duplicated in header and in response body. /// For unknown reason, error code is duplicated in header and in response body.
@ -754,18 +761,18 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
if (op_num == -1) if (op_num == -1)
response = std::make_shared<ZooKeeperErrorResponse>(); response = std::make_shared<ZooKeeperErrorResponse>();
if (op_error) if (op_error != Error::ZOK)
{ {
response->error = op_error; response->error = op_error;
/// Set error for whole transaction. /// Set error for whole transaction.
/// If some operations fail, ZK send global error as zero and then send details about each operation. /// If some operations fail, ZK send global error as zero and then send details about each operation.
/// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations. /// It will set error code for first failed operation and it will set special "runtime inconsistency" code for other operations.
if (!error && op_error != ZRUNTIMEINCONSISTENCY) if (error == Error::ZOK && op_error != Error::ZRUNTIMEINCONSISTENCY)
error = op_error; error = op_error;
} }
if (!op_error || op_num == -1) if (op_error == Error::ZOK || op_num == -1)
dynamic_cast<ZooKeeperResponse &>(*response).readImpl(in); dynamic_cast<ZooKeeperResponse &>(*response).readImpl(in);
} }
@ -780,11 +787,11 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
Coordination::read(error_read, in); Coordination::read(error_read, in);
if (!done) if (!done)
throw Exception("Too many results received for multi transaction", ZMARSHALLINGERROR); throw Exception("Too many results received for multi transaction", Error::ZMARSHALLINGERROR);
if (op_num != -1) if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction", ZMARSHALLINGERROR); throw Exception("Unexpected op_num received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
if (error_read != -1) if (error_read != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction", ZMARSHALLINGERROR); throw Exception("Unexpected error value received at the end of results for multi transaction", Error::ZMARSHALLINGERROR);
} }
} }
}; };
@ -883,7 +890,7 @@ void ZooKeeper::connect(
Poco::Timespan connection_timeout) Poco::Timespan connection_timeout)
{ {
if (nodes.empty()) if (nodes.empty())
throw Exception("No nodes passed to ZooKeeper constructor", ZBADARGUMENTS); throw Exception("No nodes passed to ZooKeeper constructor", Error::ZBADARGUMENTS);
static constexpr size_t num_tries = 3; static constexpr size_t num_tries = 3;
bool connected = false; bool connected = false;
@ -970,7 +977,7 @@ void ZooKeeper::connect(
} }
message << fail_reasons.str() << "\n"; message << fail_reasons.str() << "\n";
throw Exception(message.str(), ZCONNECTIONLOSS); throw Exception(message.str(), Error::ZCONNECTIONLOSS);
} }
} }
@ -1005,11 +1012,11 @@ void ZooKeeper::receiveHandshake()
read(handshake_length); read(handshake_length);
if (handshake_length != 36) if (handshake_length != 36)
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ZMARSHALLINGERROR); throw Exception("Unexpected handshake length received: " + toString(handshake_length), Error::ZMARSHALLINGERROR);
read(protocol_version_read); read(protocol_version_read);
if (protocol_version_read != protocol_version) if (protocol_version_read != protocol_version)
throw Exception("Unexpected protocol version: " + toString(protocol_version_read), ZMARSHALLINGERROR); throw Exception("Unexpected protocol version: " + toString(protocol_version_read), Error::ZMARSHALLINGERROR);
read(timeout); read(timeout);
if (timeout != session_timeout.totalMilliseconds()) if (timeout != session_timeout.totalMilliseconds())
@ -1032,7 +1039,7 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
int32_t length; int32_t length;
XID read_xid; XID read_xid;
int64_t zxid; int64_t zxid;
int32_t err; Error err;
read(length); read(length);
size_t count_before_event = in->count(); size_t count_before_event = in->count();
@ -1042,16 +1049,16 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
if (read_xid != auth_xid) if (read_xid != auth_xid)
throw Exception("Unexpected event received in reply to auth request: " + toString(read_xid), throw Exception("Unexpected event received in reply to auth request: " + toString(read_xid),
ZMARSHALLINGERROR); Error::ZMARSHALLINGERROR);
int32_t actual_length = in->count() - count_before_event; int32_t actual_length = in->count() - count_before_event;
if (length != actual_length) if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length),
ZMARSHALLINGERROR); Error::ZMARSHALLINGERROR);
if (err) if (err != Error::ZOK)
throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)), throw Exception("Error received in reply to auth request. Code: " + toString(int32_t(err)) + ". Message: " + String(errorMessage(err)),
ZMARSHALLINGERROR); Error::ZMARSHALLINGERROR);
} }
@ -1154,7 +1161,7 @@ void ZooKeeper::receiveThread()
earliest_operation = operations.begin()->second; earliest_operation = operations.begin()->second;
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds()); auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
if (now > earliest_operation_deadline) if (now > earliest_operation_deadline)
throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT); throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count(); max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
} }
} }
@ -1170,10 +1177,10 @@ void ZooKeeper::receiveThread()
else else
{ {
if (earliest_operation) if (earliest_operation)
throw Exception("Operation timeout (no response) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT); throw Exception("Operation timeout (no response) for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
waited += max_wait; waited += max_wait;
if (waited >= session_timeout.totalMicroseconds()) if (waited >= session_timeout.totalMicroseconds())
throw Exception("Nothing is received in session timeout", ZOPERATIONTIMEOUT); throw Exception("Nothing is received in session timeout", Error::ZOPERATIONTIMEOUT);
} }
@ -1193,7 +1200,7 @@ void ZooKeeper::receiveEvent()
int32_t length; int32_t length;
XID xid; XID xid;
int64_t zxid; int64_t zxid;
int32_t err; Error err;
read(length); read(length);
size_t count_before_event = in->count(); size_t count_before_event = in->count();
@ -1206,8 +1213,8 @@ void ZooKeeper::receiveEvent()
if (xid == ping_xid) if (xid == ping_xid)
{ {
if (err) if (err != Error::ZOK)
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY); throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), Error::ZRUNTIMEINCONSISTENCY);
response = std::make_shared<ZooKeeperHeartbeatResponse>(); response = std::make_shared<ZooKeeperHeartbeatResponse>();
} }
@ -1252,7 +1259,7 @@ void ZooKeeper::receiveEvent()
auto it = operations.find(xid); auto it = operations.find(xid);
if (it == operations.end()) if (it == operations.end())
throw Exception("Received response for unknown xid", ZRUNTIMEINCONSISTENCY); throw Exception("Received response for unknown xid", Error::ZRUNTIMEINCONSISTENCY);
/// After this point, we must invoke callback, that we've grabbed from 'operations'. /// After this point, we must invoke callback, that we've grabbed from 'operations'.
/// Invariant: all callbacks are invoked either in case of success or in case of error. /// Invariant: all callbacks are invoked either in case of success or in case of error.
@ -1272,7 +1279,7 @@ void ZooKeeper::receiveEvent()
if (!response) if (!response)
response = request_info.request->makeResponse(); response = request_info.request->makeResponse();
if (err) if (err != Error::ZOK)
response->error = err; response->error = err;
else else
{ {
@ -1282,7 +1289,7 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event; int32_t actual_length = in->count() - count_before_event;
if (length != actual_length) if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR); throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), Error::ZMARSHALLINGERROR);
} }
catch (...) catch (...)
{ {
@ -1294,7 +1301,7 @@ void ZooKeeper::receiveEvent()
/// In case we cannot read the response, we should indicate it as the error of that type /// In case we cannot read the response, we should indicate it as the error of that type
/// when the user cannot assume whether the request was processed or not. /// when the user cannot assume whether the request was processed or not.
response->error = ZCONNECTIONLOSS; response->error = Error::ZCONNECTIONLOSS;
if (request_info.callback) if (request_info.callback)
request_info.callback(*response); request_info.callback(*response);
@ -1361,8 +1368,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
ResponsePtr response = request_info.request->makeResponse(); ResponsePtr response = request_info.request->makeResponse();
response->error = request_info.request->probably_sent response->error = request_info.request->probably_sent
? ZCONNECTIONLOSS ? Error::ZCONNECTIONLOSS
: ZSESSIONEXPIRED; : Error::ZSESSIONEXPIRED;
if (request_info.callback) if (request_info.callback)
{ {
@ -1390,7 +1397,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
WatchResponse response; WatchResponse response;
response.type = SESSION; response.type = SESSION;
response.state = EXPIRED_SESSION; response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED; response.error = Error::ZSESSIONEXPIRED;
for (auto & callback : path_watches.second) for (auto & callback : path_watches.second)
{ {
@ -1421,7 +1428,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
ResponsePtr response = info.request->makeResponse(); ResponsePtr response = info.request->makeResponse();
if (response) if (response)
{ {
response->error = ZSESSIONEXPIRED; response->error = Error::ZSESSIONEXPIRED;
try try
{ {
info.callback(*response); info.callback(*response);
@ -1437,7 +1444,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
WatchResponse response; WatchResponse response;
response.type = SESSION; response.type = SESSION;
response.state = EXPIRED_SESSION; response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED; response.error = Error::ZSESSIONEXPIRED;
try try
{ {
info.watch(response); info.watch(response);
@ -1466,9 +1473,9 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{ {
info.request->xid = next_xid.fetch_add(1); info.request->xid = next_xid.fetch_add(1);
if (info.request->xid == close_xid) if (info.request->xid == close_xid)
throw Exception("xid equal to close_xid", ZSESSIONEXPIRED); throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED);
if (info.request->xid < 0) if (info.request->xid < 0)
throw Exception("XID overflow", ZSESSIONEXPIRED); throw Exception("XID overflow", Error::ZSESSIONEXPIRED);
} }
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls /// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
@ -1478,10 +1485,10 @@ void ZooKeeper::pushRequest(RequestInfo && info)
std::lock_guard lock(push_request_mutex); std::lock_guard lock(push_request_mutex);
if (expired) if (expired)
throw Exception("Session expired", ZSESSIONEXPIRED); throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
} }
catch (...) catch (...)
{ {
@ -1651,7 +1658,7 @@ void ZooKeeper::close()
request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request)); request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request));
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push close request to queue within operation timeout", ZOPERATIONTIMEOUT); throw Exception("Cannot push close request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
ProfileEvents::increment(ProfileEvents::ZooKeeperClose); ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
} }

View File

@ -86,7 +86,7 @@ TEST(zkutil, MultiAsync)
ops.clear(); ops.clear();
auto res = fut.get(); auto res = fut.get();
ASSERT_EQ(res.error, Coordination::ZOK); ASSERT_EQ(res.error, Coordination::Error::ZOK);
ASSERT_EQ(res.responses.size(), 2); ASSERT_EQ(res.responses.size(), 2);
} }
@ -126,15 +126,15 @@ TEST(zkutil, MultiAsync)
/// The test is quite heavy. It is normal if session is expired during this test. /// The test is quite heavy. It is normal if session is expired during this test.
/// If we don't check that, the test will be flacky. /// If we don't check that, the test will be flacky.
if (res.error != Coordination::ZSESSIONEXPIRED && res.error != Coordination::ZCONNECTIONLOSS) if (res.error != Coordination::Error::ZSESSIONEXPIRED && res.error != Coordination::Error::ZCONNECTIONLOSS)
{ {
ASSERT_EQ(res.error, Coordination::ZNODEEXISTS); ASSERT_EQ(res.error, Coordination::Error::ZNODEEXISTS);
ASSERT_EQ(res.responses.size(), 2); ASSERT_EQ(res.responses.size(), 2);
} }
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code != Coordination::ZSESSIONEXPIRED && e.code != Coordination::ZCONNECTIONLOSS) if (e.code != Coordination::Error::ZSESSIONEXPIRED && e.code != Coordination::Error::ZCONNECTIONLOSS)
throw; throw;
} }
} }

View File

@ -39,12 +39,12 @@ int main(int argc, char ** argv)
ops.emplace_back(zkutil::makeRemoveRequest("/test/zk_expiration_test", -1)); ops.emplace_back(zkutil::makeRemoveRequest("/test/zk_expiration_test", -1));
Coordination::Responses responses; Coordination::Responses responses;
int32_t code = zk.tryMultiNoThrow(ops, responses); Coordination::Error code = zk.tryMultiNoThrow(ops, responses);
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(code) << std::endl; std::cout << time(nullptr) - time0 << "s: " << Coordination::errorMessage(code) << std::endl;
try try
{ {
if (code) if (code != Coordination::Error::ZOK)
std::cout << "Path: " << zkutil::KeeperMultiException(code, ops, responses).getPathForFirstFailedOp() << std::endl; std::cout << "Path: " << zkutil::KeeperMultiException(code, ops, responses).getPathForFirstFailedOp() << std::endl;
} }
catch (...) catch (...)

View File

@ -49,8 +49,8 @@ try
zk.create("/test", "old", false, false, {}, zk.create("/test", "old", false, false, {},
[&](const CreateResponse & response) [&](const CreateResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (create) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (create): " << errorMessage(response.error) << '\n';
else else
std::cerr << "Created path: " << response.path_created << '\n'; std::cerr << "Created path: " << response.path_created << '\n';
@ -64,8 +64,8 @@ try
zk.get("/test", zk.get("/test",
[&](const GetResponse & response) [&](const GetResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (get) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (get): " << errorMessage(response.error) << '\n';
else else
std::cerr << "Value: " << response.data << '\n'; std::cerr << "Value: " << response.data << '\n';
@ -73,8 +73,8 @@ try
}, },
[](const WatchResponse & response) [](const WatchResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (get) on /test, Error " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Watch (get) on /test, Error: " << errorMessage(response.error) << '\n';
else else
std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n'; std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n';
}); });
@ -86,8 +86,8 @@ try
zk.set("/test", "new", -1, zk.set("/test", "new", -1,
[&](const SetResponse & response) [&](const SetResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (set) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (set): " << errorMessage(response.error) << '\n';
else else
std::cerr << "Set\n"; std::cerr << "Set\n";
@ -101,8 +101,8 @@ try
zk.list("/", zk.list("/",
[&](const ListResponse & response) [&](const ListResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (list) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (list): " << errorMessage(response.error) << '\n';
else else
{ {
std::cerr << "Children:\n"; std::cerr << "Children:\n";
@ -114,8 +114,8 @@ try
}, },
[](const WatchResponse & response) [](const WatchResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (list) on /, Error " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Watch (list) on /, Error: " << errorMessage(response.error) << '\n';
else else
std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n'; std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n';
}); });
@ -127,8 +127,8 @@ try
zk.exists("/test", zk.exists("/test",
[&](const ExistsResponse & response) [&](const ExistsResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (exists) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (exists): " << errorMessage(response.error) << '\n';
else else
std::cerr << "Exists\n"; std::cerr << "Exists\n";
@ -136,8 +136,8 @@ try
}, },
[](const WatchResponse & response) [](const WatchResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Watch (exists) on /test, Error " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Watch (exists) on /test, Error: " << errorMessage(response.error) << '\n';
else else
std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n'; std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n';
}); });
@ -148,8 +148,8 @@ try
zk.remove("/test", -1, [&](const RemoveResponse & response) zk.remove("/test", -1, [&](const RemoveResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (remove) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (remove): " << errorMessage(response.error) << '\n';
else else
std::cerr << "Removed\n"; std::cerr << "Removed\n";
@ -184,13 +184,13 @@ try
zk.multi(ops, [&](const MultiResponse & response) zk.multi(ops, [&](const MultiResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error (multi) " << response.error << ": " << errorMessage(response.error) << '\n'; std::cerr << "Error (multi): " << errorMessage(response.error) << '\n';
else else
{ {
for (const auto & elem : response.responses) for (const auto & elem : response.responses)
if (elem->error) if (elem->error != Coordination::Error::ZOK)
std::cerr << "Error (elem) " << elem->error << ": " << errorMessage(elem->error) << '\n'; std::cerr << "Error (elem): " << errorMessage(elem->error) << '\n';
std::cerr << "Created path: " << dynamic_cast<const CreateResponse &>(*response.responses[0]).path_created << '\n'; std::cerr << "Created path: " << dynamic_cast<const CreateResponse &>(*response.responses[0]).path_created << '\n';
} }

View File

@ -9,8 +9,8 @@ try
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response) zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
{ {
if (response.error) if (response.error != Coordination::Error::ZOK)
std::cerr << "Error " << response.error << ": " << Coordination::errorMessage(response.error) << "\n"; std::cerr << "Error: " << Coordination::errorMessage(response.error) << "\n";
else else
std::cerr << "Path created: " << response.path_created << "\n"; std::cerr << "Path created: " << response.path_created << "\n";
}); });

View File

@ -422,7 +422,7 @@ void DDLWorker::processTasks()
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (server_startup && e.code == Coordination::ZNONODE) if (server_startup && e.code == Coordination::Error::ZNONODE)
{ {
LOG_WARNING(log, "ZooKeeper NONODE error during startup. Ignoring entry {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true)); LOG_WARNING(log, "ZooKeeper NONODE error during startup. Ignoring entry {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true));
} }
@ -603,15 +603,15 @@ void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy); auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
{ {
// Ok // Ok
} }
else if (code == Coordination::ZNONODE) else if (code == Coordination::Error::ZNONODE)
{ {
/// There is no parent /// There is no parent
createStatusDirs(task.entry_path, zookeeper); createStatusDirs(task.entry_path, zookeeper);
if (Coordination::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy)) if (Coordination::Error::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw Coordination::Exception(code, active_node_path); throw Coordination::Exception(code, active_node_path);
} }
else else
@ -915,8 +915,9 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP
ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request))); ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
} }
Coordination::Responses responses; Coordination::Responses responses;
int code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code && code != Coordination::ZNODEEXISTS) if (code != Coordination::Error::ZOK
&& code != Coordination::Error::ZNODEEXISTS)
throw Coordination::Exception(code); throw Coordination::Exception(code);
} }
@ -1013,7 +1014,7 @@ void DDLWorker::runMainThread()
} }
} }
} }
else if (e.code == Coordination::ZNONODE) else if (e.code == Coordination::Error::ZNONODE)
{ {
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
} }
@ -1201,8 +1202,8 @@ private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path) static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
{ {
Strings res; Strings res;
int code = zookeeper->tryGetChildren(node_path, res); Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
if (code && code != Coordination::ZNONODE) if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, node_path); throw Coordination::Exception(code, node_path);
return res; return res;
} }

View File

@ -104,13 +104,13 @@ EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version)); lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version));
Coordination::Responses lock_responses; Coordination::Responses lock_responses;
int rc = zookeeper.tryMulti(lock_ops, lock_responses); Coordination::Error rc = zookeeper.tryMulti(lock_ops, lock_responses);
if (rc == Coordination::ZBADVERSION) if (rc == Coordination::Error::ZBADVERSION)
{ {
LOG_TRACE(&Poco::Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry."); LOG_TRACE(&Poco::Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry.");
continue; continue;
} }
else if (rc != Coordination::ZOK) else if (rc != Coordination::Error::ZOK)
throw Coordination::Exception(rc); throw Coordination::Exception(rc);
for (size_t i = 0; i < partitions.size(); ++i) for (size_t i = 0; i < partitions.size(); ++i)

View File

@ -85,7 +85,7 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
*/ */
auto quorum_status = quorum_status_future.get(); auto quorum_status = quorum_status_future.get();
if (quorum_status.error != Coordination::ZNONODE) if (quorum_status.error != Coordination::Error::ZNONODE)
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
/// Both checks are implicitly made also later (otherwise there would be a race condition). /// Both checks are implicitly made also later (otherwise there would be a race condition).
@ -93,7 +93,7 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
auto is_active = is_active_future.get(); auto is_active = is_active_future.get();
auto host = host_future.get(); auto host = host_future.get();
if (is_active.error == Coordination::ZNONODE || host.error == Coordination::ZNONODE) if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
throw Exception("Replica is not active right now", ErrorCodes::READONLY); throw Exception("Replica is not active right now", ErrorCodes::READONLY);
quorum_info.is_active_node_value = is_active.data; quorum_info.is_active_node_value = is_active.data;
@ -299,9 +299,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
storage.renameTempPartAndAdd(part, nullptr, &transaction); storage.renameTempPartAndAdd(part, nullptr, &transaction);
Coordination::Responses responses; Coordination::Responses responses;
int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::ZOK) if (multi_code == Coordination::Error::ZOK)
{ {
transaction.commit(); transaction.commit();
storage.merge_selecting_task->schedule(); storage.merge_selecting_task->schedule();
@ -309,8 +309,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Lock nodes have been already deleted, do not delete them in destructor /// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked(); block_number_lock->assumeUnlocked();
} }
else if (multi_code == Coordination::ZCONNECTIONLOSS else if (multi_code == Coordination::Error::ZCONNECTIONLOSS
|| multi_code == Coordination::ZOPERATIONTIMEOUT) || multi_code == Coordination::Error::ZOPERATIONTIMEOUT)
{ {
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part /** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again. * if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
@ -326,7 +326,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
{ {
String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp(); String failed_op_path = zkutil::KeeperMultiException(multi_code, ops, responses).getPathForFirstFailedOp();
if (multi_code == Coordination::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path) if (multi_code == Coordination::Error::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
{ {
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion. /// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
LOG_INFO(log, "Block with ID {} already exists; ignoring it (removing part {})", block_id, part->name); LOG_INFO(log, "Block with ID {} already exists; ignoring it (removing part {})", block_id, part->name);
@ -336,7 +336,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
last_block_is_duplicate = true; last_block_is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
} }
else if (multi_code == Coordination::ZNODEEXISTS && failed_op_path == quorum_info.status_path) else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{ {
transaction.rollback(); transaction.rollback();
@ -347,7 +347,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed. /// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
transaction.rollback(); transaction.rollback();
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': " throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(multi_code) + ", path " + failed_op_path, + Coordination::errorMessage(multi_code) + ", path " + failed_op_path,
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
} }
} }
@ -355,13 +355,13 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
{ {
transaction.rollback(); transaction.rollback();
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': " throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
} }
else else
{ {
transaction.rollback(); transaction.rollback();
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': " throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + Coordination::errorMessage(multi_code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
} }
if (quorum) if (quorum)

View File

@ -40,7 +40,7 @@ void ReplicatedMergeTreeCleanupThread::run()
{ {
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::ZSESSIONEXPIRED) if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return; return;
} }
catch (...) catch (...)
@ -319,15 +319,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
for (auto & pair : try_remove_futures) for (auto & pair : try_remove_futures)
{ {
const String & path = pair.first; const String & path = pair.first;
int32_t rc = pair.second.get().error; Coordination::Error rc = pair.second.get().error;
if (rc == Coordination::ZNOTEMPTY) if (rc == Coordination::Error::ZNOTEMPTY)
{ {
/// Can happen if there are leftover block nodes with children created by previous server versions. /// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper->removeRecursive(path); zookeeper->removeRecursive(path);
cached_block_stats.erase(first_outdated_block->node); cached_block_stats.erase(first_outdated_block->node);
} }
else if (rc) else if (rc != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, zkutil::ZooKeeper::error2string(rc)); LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
else else
{ {
/// Successfully removed blocks have to be removed from cache /// Successfully removed blocks have to be removed from cache
@ -348,7 +348,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
Strings blocks; Strings blocks;
Coordination::Stat stat; Coordination::Stat stat;
if (zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat)) if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
/// Seems like this code is obsolete, because we delete blocks from cache /// Seems like this code is obsolete, because we delete blocks from cache
@ -391,7 +391,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
for (auto & elem : exists_futures) for (auto & elem : exists_futures)
{ {
auto status = elem.second.get(); auto status = elem.second.get();
if (status.error != Coordination::ZNONODE) if (status.error != Coordination::Error::ZNONODE)
{ {
cached_block_stats.emplace(elem.first, status.stat.ctime); cached_block_stats.emplace(elem.first, status.stat.ctime);
timed_blocks.emplace_back(elem.first, status.stat.ctime); timed_blocks.emplace_back(elem.first, status.stat.ctime);

View File

@ -368,7 +368,7 @@ void ReplicatedMergeTreePartCheckThread::run()
{ {
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::ZSESSIONEXPIRED) if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return; return;
task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);

View File

@ -319,8 +319,8 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
Coordination::Responses responses; Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses); auto code = zookeeper->tryMulti(ops, responses);
if (code) if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't set value of nodes for insert times ({}/min_unprocessed_insert_time, max_processed_insert_time): {}", replica_path, zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often."); LOG_ERROR(log, "Couldn't set value of nodes for insert times ({}/min_unprocessed_insert_time, max_processed_insert_time): {}. This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
} }
} }
@ -364,8 +364,8 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
notifySubscribers(queue_size); notifySubscribers(queue_size);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name); auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (code) if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't remove {}/queue/{}: {}. This shouldn't happen often.", replica_path, entry->znode_name, zkutil::ZooKeeper::error2string(code)); LOG_ERROR(log, "Couldn't remove {}/queue/{}: {}. This shouldn't happen often.", replica_path, entry->znode_name, Coordination::errorMessage(code));
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
} }
@ -720,7 +720,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
std::lock_guard lock(update_mutations_mutex); std::lock_guard lock(update_mutations_mutex);
auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id); auto rc = zookeeper->tryRemove(zookeeper_path + "/mutations/" + mutation_id);
if (rc == Coordination::ZOK) if (rc == Coordination::Error::ZOK)
LOG_DEBUG(log, "Removed mutation {} from ZooKeeper.", mutation_id); LOG_DEBUG(log, "Removed mutation {} from ZooKeeper.", mutation_id);
ReplicatedMergeTreeMutationEntryPtr entry; ReplicatedMergeTreeMutationEntryPtr entry;
@ -844,8 +844,8 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
if ((*it)->currently_executing) if ((*it)->currently_executing)
to_wait.push_back(*it); to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
if (code) if (code != Coordination::Error::ZOK)
LOG_INFO(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, zkutil::ZooKeeper::error2string(code)); LOG_INFO(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, Coordination::errorMessage(code));
updateStateOnQueueEntryRemoval( updateStateOnQueueEntryRemoval(
*it, /* is_successful = */ false, *it, /* is_successful = */ false,
@ -1625,7 +1625,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
for (auto & block : block_infos) for (auto & block : block_infos)
{ {
Coordination::GetResponse resp = block.contents_future.get(); Coordination::GetResponse resp = block.contents_future.get();
if (!resp.error && lock_holder_paths.count(resp.data)) if (resp.error == Coordination::Error::ZOK && lock_holder_paths.count(resp.data))
committing_blocks[block.partition].insert(block.number); committing_blocks[block.partition].insert(block.number);
} }
} }
@ -1633,7 +1633,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
queue_.pullLogsToQueue(zookeeper); queue_.pullLogsToQueue(zookeeper);
Coordination::GetResponse quorum_status_response = quorum_status_future.get(); Coordination::GetResponse quorum_status_response = quorum_status_future.get();
if (!quorum_status_response.error) if (quorum_status_response.error == Coordination::Error::ZOK)
{ {
ReplicatedMergeTreeQuorumEntry quorum_status; ReplicatedMergeTreeQuorumEntry quorum_status;
quorum_status.fromString(quorum_status_response.data); quorum_status.fromString(quorum_status_response.data);

View File

@ -234,7 +234,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
Strings failed_parts; Strings failed_parts;
if (zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts", failed_parts) != Coordination::ZOK) if (zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts", failed_parts) != Coordination::Error::ZOK)
return; return;
/// Firstly, remove parts from ZooKeeper /// Firstly, remove parts from ZooKeeper
@ -294,12 +294,12 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
{ {
auto code = zookeeper->tryRemove(is_active_path, stat.version); auto code = zookeeper->tryRemove(is_active_path, stat.version);
if (code == Coordination::ZBADVERSION) if (code == Coordination::Error::ZBADVERSION)
throw Exception("Another instance of replica " + storage.replica_path + " was created just now." throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
" You shouldn't run multiple instances of same replica. You need to check configuration files.", " You shouldn't run multiple instances of same replica. You need to check configuration files.",
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE); ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
if (code && code != Coordination::ZNONODE) if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, is_active_path); throw Coordination::Exception(code, is_active_path);
} }
@ -314,7 +314,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNODEEXISTS) if (e.code == Coordination::Error::ZNODEEXISTS)
throw Exception("Replica " + storage.replica_path + " appears to be already active. If you're sure it's not, " throw Exception("Replica " + storage.replica_path + " appears to be already active. If you're sure it's not, "
"try again in a minute or remove znode " + storage.replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE); "try again in a minute or remove znode " + storage.replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);

View File

@ -441,8 +441,8 @@ bool StorageReplicatedMergeTree::createTableIfNotExists()
LOG_WARNING(log, "Removing leftovers from table {} (this might take several minutes)", zookeeper_path); LOG_WARNING(log, "Removing leftovers from table {} (this might take several minutes)", zookeeper_path);
Strings children; Strings children;
int32_t code = zookeeper->tryGetChildren(zookeeper_path, children); Coordination::Error code = zookeeper->tryGetChildren(zookeeper_path, children);
if (code == Coordination::ZNONODE) if (code == Coordination::Error::ZNONODE)
{ {
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path); LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
} }
@ -458,16 +458,16 @@ bool StorageReplicatedMergeTree::createTableIfNotExists()
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
code = zookeeper->tryMulti(ops, responses); code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZNONODE) if (code == Coordination::Error::ZNONODE)
{ {
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path); LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
} }
else if (code == Coordination::ZNOTEMPTY) else if (code == Coordination::Error::ZNOTEMPTY)
{ {
throw Exception(fmt::format( throw Exception(fmt::format(
"The old table was not completely removed from ZooKeeper, {} still exists and may contain some garbage. But it should never happen according to the logic of operations (it's a bug).", zookeeper_path), ErrorCodes::LOGICAL_ERROR); "The old table was not completely removed from ZooKeeper, {} still exists and may contain some garbage. But it should never happen according to the logic of operations (it's a bug).", zookeeper_path), ErrorCodes::LOGICAL_ERROR);
} }
else if (code != Coordination::ZOK) else if (code != Coordination::Error::ZOK)
{ {
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation. /// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
@ -535,12 +535,12 @@ bool StorageReplicatedMergeTree::createTableIfNotExists()
Coordination::Responses responses; Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses); auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_WARNING(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path); LOG_WARNING(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path);
continue; continue;
} }
else if (code != Coordination::ZOK) else if (code != Coordination::Error::ZOK)
{ {
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
} }
@ -557,7 +557,7 @@ void StorageReplicatedMergeTree::createReplica()
LOG_DEBUG(log, "Creating replica {}", replica_path); LOG_DEBUG(log, "Creating replica {}", replica_path);
int32_t code; Coordination::Error code;
do do
{ {
@ -599,15 +599,15 @@ void StorageReplicatedMergeTree::createReplica()
Coordination::Responses responses; Coordination::Responses responses;
code = zookeeper->tryMulti(ops, responses); code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST); throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
} }
else if (code == Coordination::ZBADVERSION) else if (code == Coordination::Error::ZBADVERSION)
{ {
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time"); LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
} }
else if (code == Coordination::ZNONODE) else if (code == Coordination::Error::ZNONODE)
{ {
throw Exception("Table " + zookeeper_path + " was suddenly removed.", ErrorCodes::ALL_REPLICAS_LOST); throw Exception("Table " + zookeeper_path + " was suddenly removed.", ErrorCodes::ALL_REPLICAS_LOST);
} }
@ -615,7 +615,7 @@ void StorageReplicatedMergeTree::createReplica()
{ {
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
} }
} while (code == Coordination::ZBADVERSION); } while (code == Coordination::Error::ZBADVERSION);
} }
void StorageReplicatedMergeTree::drop() void StorageReplicatedMergeTree::drop()
@ -640,7 +640,7 @@ void StorageReplicatedMergeTree::drop()
/// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line. /// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line.
Strings replicas; Strings replicas;
if (Coordination::ZOK == zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) && replicas.empty()) if (Coordination::Error::ZOK == zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) && replicas.empty())
{ {
LOG_INFO(log, "{} is the last replica, will remove table", replica_path); LOG_INFO(log, "{} is the last replica, will remove table", replica_path);
@ -656,17 +656,17 @@ void StorageReplicatedMergeTree::drop()
Coordination::Responses responses; Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/replicas", -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/replicas", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/dropped", "", zkutil::CreateMode::Persistent));
int32_t code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZNONODE || code == Coordination::ZNODEEXISTS) if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_WARNING(log, "Table {} is already started to be removing by another replica right now", replica_path); LOG_WARNING(log, "Table {} is already started to be removing by another replica right now", replica_path);
} }
else if (code == Coordination::ZNOTEMPTY) else if (code == Coordination::Error::ZNOTEMPTY)
{ {
LOG_WARNING(log, "Another replica was suddenly created, will keep the table {}", replica_path); LOG_WARNING(log, "Another replica was suddenly created, will keep the table {}", replica_path);
} }
else if (code != Coordination::ZOK) else if (code != Coordination::Error::ZOK)
{ {
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
} }
@ -676,7 +676,7 @@ void StorageReplicatedMergeTree::drop()
Strings children; Strings children;
code = zookeeper->tryGetChildren(zookeeper_path, children); code = zookeeper->tryGetChildren(zookeeper_path, children);
if (code == Coordination::ZNONODE) if (code == Coordination::Error::ZNONODE)
{ {
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path); LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
} }
@ -692,16 +692,16 @@ void StorageReplicatedMergeTree::drop()
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path, -1));
code = zookeeper->tryMulti(ops, responses); code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZNONODE) if (code == Coordination::Error::ZNONODE)
{ {
LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path); LOG_WARNING(log, "Table {} is already finished removing by another replica right now", replica_path);
} }
else if (code == Coordination::ZNOTEMPTY) else if (code == Coordination::Error::ZNOTEMPTY)
{ {
LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.", LOG_ERROR(log, "Table was not completely removed from ZooKeeper, {} still exists and may contain some garbage.",
zookeeper_path); zookeeper_path);
} }
else if (code != Coordination::ZOK) else if (code != Coordination::Error::ZOK)
{ {
/// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation. /// It is still possible that ZooKeeper session is expired or server is killed in the middle of the delete operation.
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
@ -936,7 +936,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
time_t part_create_time = 0; time_t part_create_time = 0;
Coordination::ExistsResponse exists_resp = exists_futures[i].get(); Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (!exists_resp.error) if (exists_resp.error == Coordination::Error::ZOK)
{ {
part_create_time = exists_resp.stat.ctime / 1000; part_create_time = exists_resp.stat.ctime / 1000;
removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0); removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
@ -1107,7 +1107,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
size_t num_check_ops = 2 * absent_part_paths_on_replicas.size(); size_t num_check_ops = 2 * absent_part_paths_on_replicas.size();
size_t failed_op_index = e.failed_op_index; size_t failed_op_index = e.failed_op_index;
if (failed_op_index < num_check_ops && e.code == Coordination::ZNODEEXISTS) if (failed_op_index < num_check_ops && e.code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_INFO(log, "The part {} on a replica suddenly appeared, will recheck checksums", e.getPathForFirstFailedOp()); LOG_INFO(log, "The part {} on a replica suddenly appeared, will recheck checksums", e.getPathForFirstFailedOp());
} }
@ -1584,15 +1584,15 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
Coordination::Responses responses; Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses); auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZOK) if (code == Coordination::Error::ZOK)
{ {
LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name); LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name);
queue.removeFromVirtualParts(part_info); queue.removeFromVirtualParts(part_info);
return true; return true;
} }
else if (code == Coordination::ZBADVERSION || code == Coordination::ZNONODE || code == Coordination::ZNODEEXISTS) else if (code == Coordination::Error::ZBADVERSION || code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{ {
LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part {} as failed. Code: {}", entry.new_part_name, zkutil::ZooKeeper::error2string(code)); LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part {} as failed. Code: {}", entry.new_part_name, Coordination::errorMessage(code));
} }
else else
throw Coordination::Exception(code); throw Coordination::Exception(code);
@ -2088,7 +2088,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
auto rc = zookeeper->tryMulti(ops, responses); auto rc = zookeeper->tryMulti(ops, responses);
if (rc == Coordination::ZOK) if (rc == Coordination::Error::ZOK)
{ {
break; break;
} }
@ -2256,7 +2256,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
{ {
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::ZSESSIONEXPIRED) if (e.code == Coordination::Error::ZSESSIONEXPIRED)
{ {
restarting_thread.wakeup(); restarting_thread.wakeup();
return; return;
@ -2282,7 +2282,7 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
{ {
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::ZSESSIONEXPIRED) if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return; return;
mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS); mutations_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
@ -2525,7 +2525,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
for (size_t i = 0; i < parts.size(); ++i) for (size_t i = 0; i < parts.size(); ++i)
{ {
/// If there is no information about part in ZK, we will not merge it. /// If there is no information about part in ZK, we will not merge it.
if (exists_futures[i].get().error == Coordination::ZNONODE) if (exists_futures[i].get().error == Coordination::Error::ZNONODE)
{ {
all_in_zk = false; all_in_zk = false;
@ -2871,16 +2871,16 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, new_added_parts, added_parts_stat.version)); ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, new_added_parts, added_parts_stat.version));
auto code = zookeeper->tryMulti(ops, responses); auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZOK) if (code == Coordination::Error::ZOK)
{ {
break; break;
} }
else if (code == Coordination::ZNONODE) else if (code == Coordination::Error::ZNONODE)
{ {
/// The quorum has already been achieved. /// The quorum has already been achieved.
break; break;
} }
else if (code == Coordination::ZBADVERSION) else if (code == Coordination::Error::ZBADVERSION)
{ {
/// Node was updated meanwhile. We must re-read it and repeat all the actions. /// Node was updated meanwhile. We must re-read it and repeat all the actions.
continue; continue;
@ -2893,16 +2893,16 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
/// We update the node, registering there one more replica. /// We update the node, registering there one more replica.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
if (code == Coordination::ZOK) if (code == Coordination::Error::ZOK)
{ {
break; break;
} }
else if (code == Coordination::ZNONODE) else if (code == Coordination::Error::ZNONODE)
{ {
/// The quorum has already been achieved. /// The quorum has already been achieved.
break; break;
} }
else if (code == Coordination::ZBADVERSION) else if (code == Coordination::Error::ZBADVERSION)
{ {
/// Node was updated meanwhile. We must re-read it and repeat all the actions. /// Node was updated meanwhile. We must re-read it and repeat all the actions.
continue; continue;
@ -2946,16 +2946,16 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
auto code = zookeeper->trySet(quorum_last_part_path, new_added_parts, added_parts_stat.version); auto code = zookeeper->trySet(quorum_last_part_path, new_added_parts, added_parts_stat.version);
if (code == Coordination::ZOK) if (code == Coordination::Error::ZOK)
{ {
break; break;
} }
else if (code == Coordination::ZNONODE) else if (code == Coordination::Error::ZNONODE)
{ {
/// Node is deleted. It is impossible, but it is Ok. /// Node is deleted. It is impossible, but it is Ok.
break; break;
} }
else if (code == Coordination::ZBADVERSION) else if (code == Coordination::Error::ZBADVERSION)
{ {
/// Node was updated meanwhile. We must re-read it and repeat all the actions. /// Node was updated meanwhile. We must re-read it and repeat all the actions.
continue; continue;
@ -3643,9 +3643,9 @@ void StorageReplicatedMergeTree::alter(
} }
Coordination::Responses results; Coordination::Responses results;
int32_t rc = zookeeper->tryMulti(ops, results); Coordination::Error rc = zookeeper->tryMulti(ops, results);
if (rc == Coordination::ZOK) if (rc == Coordination::Error::ZOK)
{ {
if (alter_entry->have_mutation) if (alter_entry->have_mutation)
{ {
@ -3665,9 +3665,9 @@ void StorageReplicatedMergeTree::alter(
} }
break; break;
} }
else if (rc == Coordination::ZBADVERSION) else if (rc == Coordination::Error::ZBADVERSION)
{ {
if (results[0]->error) if (results[0]->error != Coordination::Error::ZOK)
throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER); throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER);
continue; continue;
@ -3987,8 +3987,8 @@ StorageReplicatedMergeTree::allocateBlockNumber(
ops.push_back(zkutil::makeSetRequest(block_numbers_path, "", -1)); ops.push_back(zkutil::makeSetRequest(block_numbers_path, "", -1));
Coordination::Responses responses; Coordination::Responses responses;
int code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code && code != Coordination::ZNODEEXISTS) if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
} }
@ -4001,7 +4001,7 @@ StorageReplicatedMergeTree::allocateBlockNumber(
} }
catch (const zkutil::KeeperMultiException & e) catch (const zkutil::KeeperMultiException & e)
{ {
if (e.code == Coordination::ZNODEEXISTS && e.getPathForFirstFailedOp() == zookeeper_block_id_path) if (e.code == Coordination::Error::ZNODEEXISTS && e.getPathForFirstFailedOp() == zookeeper_block_id_path)
return {}; return {};
throw Exception("Cannot allocate block number in ZooKeeper: " + e.displayText(), ErrorCodes::KEEPER_EXCEPTION); throw Exception("Cannot allocate block number in ZooKeeper: " + e.displayText(), ErrorCodes::KEEPER_EXCEPTION);
@ -4690,9 +4690,9 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential)); mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses; Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses); Coordination::Error rc = zookeeper->tryMulti(requests, responses);
if (rc == Coordination::ZOK) if (rc == Coordination::Error::ZOK)
{ {
const String & path_created = const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created; dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
@ -4700,7 +4700,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
LOG_TRACE(log, "Created mutation with ID {}", entry.znode_name); LOG_TRACE(log, "Created mutation with ID {}", entry.znode_name);
break; break;
} }
else if (rc == Coordination::ZBADVERSION) else if (rc == Coordination::Error::ZBADVERSION)
{ {
LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying..."); LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
continue; continue;
@ -4892,7 +4892,7 @@ bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(const St
for (size_t i = 0; i < part_names.size(); ++i) for (size_t i = 0; i < part_names.size(); ++i)
{ {
Coordination::ExistsResponse exists_resp = exists_futures[i].get(); Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (!exists_resp.error) if (exists_resp.error == Coordination::Error::ZOK)
{ {
Coordination::Requests ops; Coordination::Requests ops;
removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
@ -4904,7 +4904,7 @@ bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(const St
{ {
auto response = future.get(); auto response = future.get();
if (response.error == 0 || response.error == Coordination::ZNONODE) if (response.error == Coordination::Error::ZOK || response.error == Coordination::Error::ZNONODE)
continue; continue;
if (Coordination::isHardwareError(response.error)) if (Coordination::isHardwareError(response.error))
@ -4953,7 +4953,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
for (size_t i = 0; i < part_names.size(); ++i) for (size_t i = 0; i < part_names.size(); ++i)
{ {
Coordination::ExistsResponse exists_resp = exists_futures[i].get(); Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (!exists_resp.error) if (exists_resp.error == Coordination::Error::ZOK)
{ {
Coordination::Requests ops; Coordination::Requests ops;
removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
@ -4982,9 +4982,9 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
continue; continue;
auto response = future.get(); auto response = future.get();
if (response.error == Coordination::ZOK) if (response.error == Coordination::Error::ZOK)
continue; continue;
else if (response.error == Coordination::ZNONODE) else if (response.error == Coordination::Error::ZNONODE)
{ {
LOG_DEBUG(log, "There is no part {} in ZooKeeper, it was only in filesystem", part_names[i]); LOG_DEBUG(log, "There is no part {} in ZooKeeper, it was only in filesystem", part_names[i]);
continue; continue;
@ -4996,7 +4996,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
continue; continue;
} }
else else
LOG_WARNING(log, "Cannot remove part {} from ZooKeeper: {}", part_names[i], zkutil::ZooKeeper::error2string(response.error)); LOG_WARNING(log, "Cannot remove part {} from ZooKeeper: {}", part_names[i], Coordination::errorMessage(response.error));
} }
} }
@ -5005,7 +5005,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num) zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
{ {
Strings blocks; Strings blocks;
if (zookeeper.tryGetChildren(zookeeper_path + "/blocks", blocks)) if (Coordination::Error::ZOK != zookeeper.tryGetChildren(zookeeper_path + "/blocks", blocks))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
String partition_prefix = partition_id + "_"; String partition_prefix = partition_id + "_";
@ -5025,7 +5025,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
const String & path = pair.first; const String & path = pair.first;
auto result = pair.second.get(); auto result = pair.second.get();
if (result.error == Coordination::ZNONODE) if (result.error == Coordination::Error::ZNONODE)
continue; continue;
ReadBufferFromString buf(result.data); ReadBufferFromString buf(result.data);
@ -5038,14 +5038,14 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
for (auto & pair : to_delete_futures) for (auto & pair : to_delete_futures)
{ {
const String & path = pair.first; const String & path = pair.first;
int32_t rc = pair.second.get().error; Coordination::Error rc = pair.second.get().error;
if (rc == Coordination::ZNOTEMPTY) if (rc == Coordination::Error::ZNOTEMPTY)
{ {
/// Can happen if there are leftover block nodes with children created by previous server versions. /// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper.removeRecursive(path); zookeeper.removeRecursive(path);
} }
else if (rc) else if (rc != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, zkutil::ZooKeeper::error2string(rc)); LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
} }
LOG_TRACE(log, "Deleted {} deduplication block IDs in partition ID {}", to_delete_futures.size(), partition_id); LOG_TRACE(log, "Deleted {} deduplication block IDs in partition ID {}", to_delete_futures.size(), partition_id);

View File

@ -131,7 +131,7 @@ void StorageSystemZooKeeper::fillData(MutableColumns & res_columns, const Contex
for (size_t i = 0, size = nodes.size(); i < size; ++i) for (size_t i = 0, size = nodes.size(); i < size; ++i)
{ {
auto res = futures[i].get(); auto res = futures[i].get();
if (res.error == Coordination::ZNONODE) if (res.error == Coordination::Error::ZNONODE)
continue; /// Node was deleted meanwhile. continue; /// Node was deleted meanwhile.
const Coordination::Stat & stat = res.stat; const Coordination::Stat & stat = res.stat;

View File

@ -86,7 +86,7 @@ try
for (BlockInfo & block : block_infos) for (BlockInfo & block : block_infos)
{ {
Coordination::GetResponse resp = block.contents_future.get(); Coordination::GetResponse resp = block.contents_future.get();
if (!resp.error && lock_holder_paths.count(resp.data)) if (resp.error == Coordination::Error::ZOK && lock_holder_paths.count(resp.data))
{ {
++total_count; ++total_count;
current_inserts[block.partition].insert(block.number); current_inserts[block.partition].insert(block.number);

View File

@ -76,7 +76,7 @@ try
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNONODE) if (e.code == Coordination::Error::ZNONODE)
continue; continue;
throw; throw;
} }

View File

@ -45,7 +45,7 @@ try
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
if (e.code == Coordination::ZNONODE) if (e.code == Coordination::Error::ZNONODE)
continue; continue;
throw; throw;
} }