This commit is contained in:
Pavel Kartavyy 2014-07-01 20:15:20 +04:00
commit 02e79179f6
2 changed files with 77 additions and 19 deletions

View File

@ -3,12 +3,14 @@
#include <zkutil/KeeperException.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <unordered_set>
#include <Yandex/logger_useful.h>
namespace zkutil
{
const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const UInt32 DEFAULT_RETRY_NUM = 3;
struct WatchWithPromise;
@ -17,8 +19,12 @@ typedef WatchWithPromise * WatchWithPromisePtr;
/** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper.
* Вместо callback-ов для watch-ей используются std::future.
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке кроме OperationTimeout.
* При OperationTimeout пытаемся попробоватть еще retry_num раз.
*
* Методы на чтение при восстанавливаемых ошибках OperationTimeout, ConnectionLoss пытаются еще retry_num раз.
* Методы на запись не пытаются повторить при восстанавливаемых ошибках, т.к. это приводит к проблеммам типа удаления дважды одного и того же.
*
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке.
*
* Методы с названиями, начинающимися с try, не бросают исключение только при перечисленных видах ошибок.
* Например, исключение бросается в любом случае, если сессия разорвалась или если не хватает прав или ресурсов.
*/
@ -75,11 +81,19 @@ public:
* - Нет родителя создаваемой ноды.
* - Родитель эфемерный.
* - Такая нода уже есть.
* - ZCONNECTIONLOSS
* - ZOPERATIONTIMEOUT
* При остальных ошибках бросает исключение.
*/
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated);
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode);
/** создает Persistent ноду.
* Игнорирует, если нода уже создана.
* Пытается сделать retry при ConnectionLoss или OperationTimeout
*/
void createIfNotExists(const std::string & path, const std::string & data);
/** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия).
*/
void remove(const std::string & path, int32_t version = -1);
@ -88,6 +102,8 @@ public:
* - Такой ноды нет.
* - У ноды другая версия.
* - У ноды есть дети.
* - ZCONNECTIONLOSS
* - ZOPERATIONTIMEOUT
*/
int32_t tryRemove(const std::string & path, int32_t version = -1);
@ -106,6 +122,8 @@ public:
/** Не бросает исключение при следующих ошибках:
* - Такой ноды нет.
* - У ноды другая версия.
* - ZCONNECTIONLOSS
* - ZOPERATIONTIMEOUT
*/
int32_t trySet(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
@ -128,6 +146,8 @@ public:
/** Бросает исключение только если какая-нибудь операция вернула "неожиданную" ошибку - такую ошибку,
* увидев которую соответствующий метод try* бросил бы исключение. */
int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr);
/** Использовать только для методов на чтение */
int32_t tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results = nullptr);
/** Удаляет ноду вместе с поддеревом. Если в это время кто-то добавит иили удалит ноду в поддереве, результат не определен.
@ -153,8 +173,13 @@ private:
int32_t retry(const T & operation)
{
int32_t code = operation();
for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT); ++i)
for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++i)
{
/// если потеряно соединение подождем timeout/3, авось восстановится
if (code == ZCONNECTIONLOSS)
usleep(sessionTimeoutMs*1000/3);
LOG_WARNING(log, "Error happened " << error2string(code) << ". Retry");
code = operation();
}
return code;
@ -182,8 +207,9 @@ private:
WatchFunction * state_watch;
std::unordered_set<WatchWithPromise *> watch_store;
/// Количество попыток повторить операцию при OperationTimeout
/// Количество попыток повторить операцию чтения при OperationTimeout, ConnectionLoss
size_t retry_num = 3;
Logger * log = nullptr;
};
typedef ZooKeeper::Ptr ZooKeeperPtr;

View File

@ -70,6 +70,8 @@ void ZooKeeper::processPromise(zhandle_t * zh, int type, int state, const char *
void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, WatchFunction * watch_)
{
log = &Logger::get("ZooKeeper");
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
hosts = hosts_;
sessionTimeoutMs = sessionTimeoutMs_;
state_watch = watch_;
@ -87,7 +89,6 @@ void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, Watc
ZooKeeper::ZooKeeper(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_)
{
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
init(hosts, sessionTimeoutMs, watch_);
}
@ -203,12 +204,14 @@ std::string ZooKeeper::create(const std::string & path, const std::string & data
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated)
{
int code = retry(boost::bind(&ZooKeeper::createImpl, this, boost::ref(path), boost::ref(data), mode, boost::ref(pathCreated)));
int code = createImpl(path, data, mode, pathCreated);
if (!( code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS))
code == ZNOCHILDRENFOREPHEMERALS ||
code == ZCONNECTIONLOSS ||
code == ZOPERATIONTIMEOUT))
throw KeeperException(code, path);
return code;
@ -220,6 +223,27 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data,
return tryCreate(path, data, mode, pathCreated);
}
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
{
int32_t code = tryCreate(path, "", zkutil::CreateMode::Persistent);
if (code == ZOK || code == ZNODEEXISTS)
return;
if (!(code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS))
throw KeeperException(code, path);
for (size_t attempt = 0; attempt < retry_num && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++attempt)
{
code = tryCreate(path, "", zkutil::CreateMode::Persistent);
};
if (code == ZOK || code == ZNODEEXISTS)
return;
else
throw KeeperException(code, path);
}
int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
{
int32_t code = zoo_delete(impl, path.c_str(), version);
@ -233,11 +257,13 @@ void ZooKeeper::remove(const std::string & path, int32_t version)
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
{
int32_t code = retry(boost::bind(&ZooKeeper::removeImpl, this, boost::ref(path), version));
int32_t code = removeImpl(path, version);
if (!( code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
code == ZNOTEMPTY ||
code == ZCONNECTIONLOSS ||
code == ZOPERATIONTIMEOUT))
throw KeeperException(code, path);
return code;
}
@ -262,7 +288,7 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchFutur
bool ZooKeeper::exists(const std::string & path, Stat * stat_, WatchFuture * watch)
{
int32_t code = existsImpl(path, stat_, watch);
int32_t code = retry(boost::bind(&ZooKeeper::existsImpl, this, path, stat_, watch));
if (!( code == ZOK ||
code == ZNONODE))
@ -337,11 +363,13 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Stat * stat_)
{
int32_t code = retry(boost::bind(&ZooKeeper::setImpl, this, boost::ref(path), boost::ref(data), version, stat_));
int32_t code = setImpl(path, data, version, stat_);
if (!( code == ZOK ||
code == ZNONODE ||
code == ZBADVERSION))
code == ZBADVERSION ||
code == ZCONNECTIONLOSS ||
code == ZOPERATIONTIMEOUT))
throw KeeperException(code, path);
return code;
}
@ -359,11 +387,8 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
int32_t code = zoo_multi(impl, ops.size(), ops.data(), out_results->data());
if (code == ZOK)
{
if (out_results_)
*out_results_ = out_results;
}
if (out_results_)
*out_results_ = out_results;
return code;
}
@ -377,18 +402,25 @@ OpResultsPtr ZooKeeper::multi(const Ops & ops)
int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
{
int32_t code = retry(boost::bind(&ZooKeeper::multiImpl, this, boost::ref(ops_), out_results_));
int32_t code = multiImpl(ops_, out_results_);
if (code != ZOK &&
code != ZNONODE &&
code != ZNODEEXISTS &&
code != ZNOCHILDRENFOREPHEMERALS &&
code != ZBADVERSION &&
code != ZNOTEMPTY)
code != ZNOTEMPTY &&
code != ZCONNECTIONLOSS &&
code != ZOPERATIONTIMEOUT)
throw KeeperException(code);
return code;
}
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results)
{
return retry(boost::bind(&ZooKeeper::tryMulti, this, boost::ref(ops), out_results));
}
void ZooKeeper::removeChildrenRecursive(const std::string & path)
{
Strings children = getChildren(path);