mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
add try limit
This commit is contained in:
parent
4d486f1a74
commit
8bbb78375e
@ -682,13 +682,25 @@ namespace
|
||||
|
||||
class CPMVOperation
|
||||
{
|
||||
constexpr static UInt64 kTryLimit = 1000;
|
||||
|
||||
public:
|
||||
CPMVOperation(String src_, String dest_, bool remove_src_, KeeperClient * client_)
|
||||
: src(std::move(src_)), dest(std::move(dest_)), remove_src(remove_src_), client(client_)
|
||||
{
|
||||
}
|
||||
|
||||
bool perform()
|
||||
bool isTryLimitReached() const
|
||||
{
|
||||
return failed_tries_count >= kTryLimit;
|
||||
}
|
||||
|
||||
bool isCompleted() const
|
||||
{
|
||||
return is_completed;
|
||||
}
|
||||
|
||||
void perform()
|
||||
{
|
||||
Coordination::Stat src_stat;
|
||||
String data = client->zookeeper->get(src, &src_stat);
|
||||
@ -707,9 +719,19 @@ public:
|
||||
switch (code)
|
||||
{
|
||||
case Coordination::Error::ZOK:
|
||||
return true;
|
||||
{
|
||||
is_completed = true;
|
||||
return;
|
||||
}
|
||||
case Coordination::Error::ZBADVERSION:
|
||||
return false;
|
||||
{
|
||||
++failed_tries_count;
|
||||
|
||||
if (isTryLimitReached())
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
|
||||
return;
|
||||
}
|
||||
default:
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
@ -722,6 +744,9 @@ private:
|
||||
String dest;
|
||||
bool remove_src = false;
|
||||
KeeperClient * client = nullptr;
|
||||
|
||||
bool is_completed = false;
|
||||
uint64_t failed_tries_count = 0;
|
||||
};
|
||||
|
||||
}
|
||||
@ -748,8 +773,8 @@ void CPCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
|
||||
|
||||
CPMVOperation operation(std::move(src), std::move(dest), /*remove_src_=*/false, /*client_=*/client);
|
||||
|
||||
while (!operation.perform())
|
||||
;
|
||||
while (!operation.isTryLimitReached() && !operation.isCompleted())
|
||||
operation.perform();
|
||||
}
|
||||
|
||||
bool MVCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
|
||||
@ -774,8 +799,8 @@ void MVCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
|
||||
|
||||
CPMVOperation operation(std::move(src), std::move(dest), /*remove_src_=*/true, /*client_=*/client);
|
||||
|
||||
while (!operation.perform())
|
||||
;
|
||||
while (!operation.isTryLimitReached() && !operation.isCompleted())
|
||||
operation.perform();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user