mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Merge
This commit is contained in:
commit
557b5e3351
@ -3,12 +3,14 @@
|
||||
#include <zkutil/KeeperException.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <unordered_set>
|
||||
#include <Yandex/logger_useful.h>
|
||||
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
|
||||
const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
|
||||
const UInt32 DEFAULT_RETRY_NUM = 3;
|
||||
|
||||
struct WatchWithPromise;
|
||||
|
||||
@ -17,8 +19,12 @@ typedef WatchWithPromise * WatchWithPromisePtr;
|
||||
|
||||
/** Сессия в ZooKeeper. Интерфейс существенно отличается от обычного API ZooKeeper.
|
||||
* Вместо callback-ов для watch-ей используются std::future.
|
||||
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке кроме OperationTimeout.
|
||||
* При OperationTimeout пытаемся попробоватть еще retry_num раз.
|
||||
*
|
||||
* Методы на чтение при восстанавливаемых ошибках OperationTimeout, ConnectionLoss пытаются еще retry_num раз.
|
||||
* Методы на запись не пытаются повторить при восстанавливаемых ошибках, т.к. это приводит к проблеммам типа удаления дважды одного и того же.
|
||||
*
|
||||
* Методы с названиями, не начинающимися с try, бросают исключение при любой ошибке.
|
||||
*
|
||||
* Методы с названиями, начинающимися с try, не бросают исключение только при перечисленных видах ошибок.
|
||||
* Например, исключение бросается в любом случае, если сессия разорвалась или если не хватает прав или ресурсов.
|
||||
*/
|
||||
@ -75,11 +81,19 @@ public:
|
||||
* - Нет родителя создаваемой ноды.
|
||||
* - Родитель эфемерный.
|
||||
* - Такая нода уже есть.
|
||||
* - ZCONNECTIONLOSS
|
||||
* - ZOPERATIONTIMEOUT
|
||||
* При остальных ошибках бросает исключение.
|
||||
*/
|
||||
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated);
|
||||
int32_t tryCreate(const std::string & path, const std::string & data, int32_t mode);
|
||||
|
||||
/** создает Persistent ноду.
|
||||
* Игнорирует, если нода уже создана.
|
||||
* Пытается сделать retry при ConnectionLoss или OperationTimeout
|
||||
*/
|
||||
void createIfNotExists(const std::string & path, const std::string & data);
|
||||
|
||||
/** Удалить ноду, если ее версия равна version (если -1, подойдет любая версия).
|
||||
*/
|
||||
void remove(const std::string & path, int32_t version = -1);
|
||||
@ -88,6 +102,8 @@ public:
|
||||
* - Такой ноды нет.
|
||||
* - У ноды другая версия.
|
||||
* - У ноды есть дети.
|
||||
* - ZCONNECTIONLOSS
|
||||
* - ZOPERATIONTIMEOUT
|
||||
*/
|
||||
int32_t tryRemove(const std::string & path, int32_t version = -1);
|
||||
|
||||
@ -106,6 +122,8 @@ public:
|
||||
/** Не бросает исключение при следующих ошибках:
|
||||
* - Такой ноды нет.
|
||||
* - У ноды другая версия.
|
||||
* - ZCONNECTIONLOSS
|
||||
* - ZOPERATIONTIMEOUT
|
||||
*/
|
||||
int32_t trySet(const std::string & path, const std::string & data,
|
||||
int32_t version = -1, Stat * stat = nullptr);
|
||||
@ -128,6 +146,8 @@ public:
|
||||
/** Бросает исключение только если какая-нибудь операция вернула "неожиданную" ошибку - такую ошибку,
|
||||
* увидев которую соответствующий метод try* бросил бы исключение. */
|
||||
int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr);
|
||||
/** Использовать только для методов на чтение */
|
||||
int32_t tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results = nullptr);
|
||||
|
||||
|
||||
/** Удаляет ноду вместе с поддеревом. Если в это время кто-то добавит иили удалит ноду в поддереве, результат не определен.
|
||||
@ -153,8 +173,13 @@ private:
|
||||
int32_t retry(const T & operation)
|
||||
{
|
||||
int32_t code = operation();
|
||||
for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT); ++i)
|
||||
for (size_t i = 0; (i < retry_num) && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++i)
|
||||
{
|
||||
/// если потеряно соединение подождем timeout/3, авось восстановится
|
||||
if (code == ZCONNECTIONLOSS)
|
||||
usleep(sessionTimeoutMs*1000/3);
|
||||
|
||||
LOG_WARNING(log, "Error happened " << error2string(code) << ". Retry");
|
||||
code = operation();
|
||||
}
|
||||
return code;
|
||||
@ -182,8 +207,9 @@ private:
|
||||
WatchFunction * state_watch;
|
||||
std::unordered_set<WatchWithPromise *> watch_store;
|
||||
|
||||
/// Количество попыток повторить операцию при OperationTimeout
|
||||
/// Количество попыток повторить операцию чтения при OperationTimeout, ConnectionLoss
|
||||
size_t retry_num = 3;
|
||||
Logger * log = nullptr;
|
||||
};
|
||||
|
||||
typedef ZooKeeper::Ptr ZooKeeperPtr;
|
||||
|
@ -70,6 +70,8 @@ void ZooKeeper::processPromise(zhandle_t * zh, int type, int state, const char *
|
||||
|
||||
void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, WatchFunction * watch_)
|
||||
{
|
||||
log = &Logger::get("ZooKeeper");
|
||||
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
|
||||
hosts = hosts_;
|
||||
sessionTimeoutMs = sessionTimeoutMs_;
|
||||
state_watch = watch_;
|
||||
@ -87,7 +89,6 @@ void ZooKeeper::init(const std::string & hosts_, int32_t sessionTimeoutMs_, Watc
|
||||
|
||||
ZooKeeper::ZooKeeper(const std::string & hosts, int32_t sessionTimeoutMs, WatchFunction * watch_)
|
||||
{
|
||||
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
|
||||
init(hosts, sessionTimeoutMs, watch_);
|
||||
}
|
||||
|
||||
@ -203,12 +204,14 @@ std::string ZooKeeper::create(const std::string & path, const std::string & data
|
||||
|
||||
int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated)
|
||||
{
|
||||
int code = retry(boost::bind(&ZooKeeper::createImpl, this, boost::ref(path), boost::ref(data), mode, boost::ref(pathCreated)));
|
||||
int code = createImpl(path, data, mode, pathCreated);
|
||||
|
||||
if (!( code == ZOK ||
|
||||
code == ZNONODE ||
|
||||
code == ZNODEEXISTS ||
|
||||
code == ZNOCHILDRENFOREPHEMERALS))
|
||||
code == ZNOCHILDRENFOREPHEMERALS ||
|
||||
code == ZCONNECTIONLOSS ||
|
||||
code == ZOPERATIONTIMEOUT))
|
||||
throw KeeperException(code, path);
|
||||
|
||||
return code;
|
||||
@ -220,6 +223,27 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data,
|
||||
return tryCreate(path, data, mode, pathCreated);
|
||||
}
|
||||
|
||||
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
|
||||
{
|
||||
int32_t code = tryCreate(path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code == ZOK || code == ZNODEEXISTS)
|
||||
return;
|
||||
|
||||
if (!(code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS))
|
||||
throw KeeperException(code, path);
|
||||
|
||||
for (size_t attempt = 0; attempt < retry_num && (code == ZOPERATIONTIMEOUT || code == ZCONNECTIONLOSS); ++attempt)
|
||||
{
|
||||
code = tryCreate(path, "", zkutil::CreateMode::Persistent);
|
||||
};
|
||||
|
||||
if (code == ZOK || code == ZNODEEXISTS)
|
||||
return;
|
||||
else
|
||||
throw KeeperException(code, path);
|
||||
}
|
||||
|
||||
int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
|
||||
{
|
||||
int32_t code = zoo_delete(impl, path.c_str(), version);
|
||||
@ -233,11 +257,13 @@ void ZooKeeper::remove(const std::string & path, int32_t version)
|
||||
|
||||
int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
|
||||
{
|
||||
int32_t code = retry(boost::bind(&ZooKeeper::removeImpl, this, boost::ref(path), version));
|
||||
int32_t code = removeImpl(path, version);
|
||||
if (!( code == ZOK ||
|
||||
code == ZNONODE ||
|
||||
code == ZBADVERSION ||
|
||||
code == ZNOTEMPTY))
|
||||
code == ZNOTEMPTY ||
|
||||
code == ZCONNECTIONLOSS ||
|
||||
code == ZOPERATIONTIMEOUT))
|
||||
throw KeeperException(code, path);
|
||||
return code;
|
||||
}
|
||||
@ -262,7 +288,7 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, WatchFutur
|
||||
|
||||
bool ZooKeeper::exists(const std::string & path, Stat * stat_, WatchFuture * watch)
|
||||
{
|
||||
int32_t code = existsImpl(path, stat_, watch);
|
||||
int32_t code = retry(boost::bind(&ZooKeeper::existsImpl, this, path, stat_, watch));
|
||||
|
||||
if (!( code == ZOK ||
|
||||
code == ZNONODE))
|
||||
@ -337,11 +363,13 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t
|
||||
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
|
||||
int32_t version, Stat * stat_)
|
||||
{
|
||||
int32_t code = retry(boost::bind(&ZooKeeper::setImpl, this, boost::ref(path), boost::ref(data), version, stat_));
|
||||
int32_t code = setImpl(path, data, version, stat_);
|
||||
|
||||
if (!( code == ZOK ||
|
||||
code == ZNONODE ||
|
||||
code == ZBADVERSION))
|
||||
code == ZBADVERSION ||
|
||||
code == ZCONNECTIONLOSS ||
|
||||
code == ZOPERATIONTIMEOUT))
|
||||
throw KeeperException(code, path);
|
||||
return code;
|
||||
}
|
||||
@ -359,11 +387,8 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
|
||||
|
||||
int32_t code = zoo_multi(impl, ops.size(), ops.data(), out_results->data());
|
||||
|
||||
if (code == ZOK)
|
||||
{
|
||||
if (out_results_)
|
||||
*out_results_ = out_results;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
@ -377,18 +402,25 @@ OpResultsPtr ZooKeeper::multi(const Ops & ops)
|
||||
|
||||
int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
|
||||
{
|
||||
int32_t code = retry(boost::bind(&ZooKeeper::multiImpl, this, boost::ref(ops_), out_results_));
|
||||
int32_t code = multiImpl(ops_, out_results_);
|
||||
|
||||
if (code != ZOK &&
|
||||
code != ZNONODE &&
|
||||
code != ZNODEEXISTS &&
|
||||
code != ZNOCHILDRENFOREPHEMERALS &&
|
||||
code != ZBADVERSION &&
|
||||
code != ZNOTEMPTY)
|
||||
code != ZNOTEMPTY &&
|
||||
code != ZCONNECTIONLOSS &&
|
||||
code != ZOPERATIONTIMEOUT)
|
||||
throw KeeperException(code);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results)
|
||||
{
|
||||
return retry(boost::bind(&ZooKeeper::tryMulti, this, boost::ref(ops), out_results));
|
||||
}
|
||||
|
||||
void ZooKeeper::removeChildrenRecursive(const std::string & path)
|
||||
{
|
||||
Strings children = getChildren(path);
|
||||
|
Loading…
Reference in New Issue
Block a user