Merge pull request #24803 from ClickHouse/followupfix_zookeeper_timeouts

Don't capture temporary references in ZooKeeper client callbacks.
This commit is contained in:
alesapin 2021-06-02 01:08:17 +03:00 committed by GitHub
commit 1c0973162d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 199 additions and 110 deletions

View File

@ -411,7 +411,7 @@ public:
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true, * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another. * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
* - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event. * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
* - data for callbacks must be alive when ZooKeeper instance is alive. * - data for callbacks must be alive when ZooKeeper instance is alive, so try to avoid capturing references in callbacks, it's error-prone.
*/ */
class IKeeper class IKeeper
{ {
@ -428,6 +428,9 @@ public:
/// ///
/// After the method is executed successfully, you must wait for callbacks /// After the method is executed successfully, you must wait for callbacks
/// (don't destroy callback data before it will be called). /// (don't destroy callback data before it will be called).
/// TODO: The above line is the description of an error-prone interface. It's better
/// to replace callbacks with std::future results, so the caller shouldn't think about
/// lifetime of the callback data.
/// ///
/// All callbacks are executed sequentially (the execution of callbacks is serialized). /// All callbacks are executed sequentially (the execution of callbacks is serialized).
/// ///

View File

@ -240,24 +240,25 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
Coordination::Stat * stat, Coordination::Stat * stat,
Coordination::WatchCallback watch_callback) Coordination::WatchCallback watch_callback)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback);
Poco::Event event;
auto callback = [&](const Coordination::ListResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{ {
SCOPE_EXIT(event.set()); impl->finalize();
code = response.error; return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
{
auto response = future_result.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
res = response.names; res = response.names;
if (stat) if (stat)
*stat = response.stat; *stat = response.stat;
} }
}; return code;
}
impl->list(path, callback, watch_callback);
event.wait();
return code;
} }
Strings ZooKeeper::getChildren( Strings ZooKeeper::getChildren(
@ -300,20 +301,21 @@ Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Str
Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryCreateNoThrow(path, data, mode);
Poco::Event event;
auto callback = [&](const Coordination::CreateResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{ {
SCOPE_EXIT(event.set()); impl->finalize();
code = response.error; return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
{
auto response = future_result.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
path_created = response.path_created; path_created = response.path_created;
}; return code;
}
impl->create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode
event.wait();
return code;
} }
std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t mode) std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t mode)
@ -368,19 +370,19 @@ void ZooKeeper::createAncestors(const std::string & path)
Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version) Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryRemoveNoThrow(path, version);
Poco::Event event;
auto callback = [&](const Coordination::RemoveResponse & response)
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{ {
SCOPE_EXIT(event.set()); impl->finalize();
if (response.error != Coordination::Error::ZOK) return Coordination::Error::ZOPERATIONTIMEOUT;
code = response.error; }
}; else
{
impl->remove(path, version, callback); auto response = future_result.get();
event.wait(); return response.error;
return code; }
} }
void ZooKeeper::remove(const std::string & path, int32_t version) void ZooKeeper::remove(const std::string & path, int32_t version)
@ -401,26 +403,22 @@ Coordination::Error ZooKeeper::tryRemove(const std::string & path, int32_t versi
Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryExistsNoThrow(path, watch_callback);
Poco::Event event;
auto callback = [&](const Coordination::ExistsResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
SCOPE_EXIT(event.set());
code = response.error;
if (code == Coordination::Error::ZOK && stat)
*stat = response.stat;
};
impl->exists(path, callback, watch_callback);
if (!event.tryWait(operation_timeout_ms))
{ {
impl->finalize(); impl->finalize();
return Coordination::Error::ZOPERATIONTIMEOUT; return Coordination::Error::ZOPERATIONTIMEOUT;
} }
else
{
auto response = future_result.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK && stat)
*stat = response.stat;
return code; return code;
}
} }
bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
@ -439,29 +437,25 @@ bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat,
Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryGetNoThrow(path, watch_callback);
Poco::Event event;
auto callback = [&](const Coordination::GetResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{ {
SCOPE_EXIT(event.set()); impl->finalize();
code = response.error; return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
{
auto response = future_result.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
res = response.data; res = response.data;
if (stat) if (stat)
*stat = response.stat; *stat = response.stat;
} }
}; return code;
impl->get(path, callback, watch_callback);
if (!event.tryWait(operation_timeout_ms))
{
impl->finalize();
return Coordination::Error::ZOPERATIONTIMEOUT;
} }
return code;
} }
@ -516,24 +510,22 @@ bool ZooKeeper::tryGetWatch(
Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::string & data, Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::string & data,
int32_t version, Coordination::Stat * stat) int32_t version, Coordination::Stat * stat)
{ {
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTrySetNoThrow(path, data, version);
Poco::Event event;
auto callback = [&](const Coordination::SetResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
SCOPE_EXIT(event.set());
code = response.error;
if (code == Coordination::Error::ZOK && stat)
*stat = response.stat;
};
impl->set(path, data, version, callback);
if (!event.tryWait(operation_timeout_ms))
{ {
impl->finalize(); impl->finalize();
return Coordination::Error::ZOPERATIONTIMEOUT; return Coordination::Error::ZOPERATIONTIMEOUT;
} }
return code; else
{
auto response = future_result.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK && stat)
*stat = response.stat;
return code;
}
} }
void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat) void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat)
@ -570,23 +562,20 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
if (requests.empty()) if (requests.empty())
return Coordination::Error::ZOK; return Coordination::Error::ZOK;
Coordination::Error code = Coordination::Error::ZOK; auto future_result = asyncTryMultiNoThrow(requests);
Poco::Event event;
auto callback = [&](const Coordination::MultiResponse & response) if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
SCOPE_EXIT(event.set());
code = response.error;
responses = response.responses;
};
impl->multi(requests, callback);
if (!event.tryWait(operation_timeout_ms))
{ {
impl->finalize(); impl->finalize();
return Coordination::Error::ZOPERATIONTIMEOUT; return Coordination::Error::ZOPERATIONTIMEOUT;
} }
return code; else
{
auto response = future_result.get();
Coordination::Error code = response.error;
responses = response.responses;
return code;
}
} }
Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests) Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests)
@ -771,8 +760,21 @@ std::future<Coordination::CreateResponse> ZooKeeper::asyncCreate(const std::stri
return future; return future;
} }
std::future<Coordination::CreateResponse> ZooKeeper::asyncTryCreateNoThrow(const std::string & path, const std::string & data, int32_t mode)
{
auto promise = std::make_shared<std::promise<Coordination::CreateResponse>>();
auto future = promise->get_future();
std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & path) auto callback = [promise](const Coordination::CreateResponse & response) mutable
{
promise->set_value(response);
};
impl->create(path, data, mode & 1, mode & 2, {}, std::move(callback));
return future;
}
std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & path, Coordination::WatchCallback watch_callback)
{ {
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>(); auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future(); auto future = promise->get_future();
@ -785,7 +787,21 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncGet(const std::string & p
promise->set_value(response); promise->set_value(response);
}; };
impl->get(path, std::move(callback), {}); impl->get(path, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::GetResponse> ZooKeeper::asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
{
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{
promise->set_value(response);
};
impl->get(path, std::move(callback), watch_callback);
return future; return future;
} }
@ -807,7 +823,7 @@ std::future<Coordination::GetResponse> ZooKeeper::asyncTryGet(const std::string
return future; return future;
} }
std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::string & path) std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::string & path, Coordination::WatchCallback watch_callback)
{ {
auto promise = std::make_shared<std::promise<Coordination::ExistsResponse>>(); auto promise = std::make_shared<std::promise<Coordination::ExistsResponse>>();
auto future = promise->get_future(); auto future = promise->get_future();
@ -820,7 +836,21 @@ std::future<Coordination::ExistsResponse> ZooKeeper::asyncExists(const std::stri
promise->set_value(response); promise->set_value(response);
}; };
impl->exists(path, std::move(callback), {}); impl->exists(path, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::ExistsResponse> ZooKeeper::asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
{
auto promise = std::make_shared<std::promise<Coordination::ExistsResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::ExistsResponse & response) mutable
{
promise->set_value(response);
};
impl->exists(path, std::move(callback), watch_callback);
return future; return future;
} }
@ -841,7 +871,22 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncSet(const std::string & p
return future; return future;
} }
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path)
std::future<Coordination::SetResponse> ZooKeeper::asyncTrySetNoThrow(const std::string & path, const std::string & data, int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::SetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::SetResponse & response) mutable
{
promise->set_value(response);
};
impl->set(path, data, version, std::move(callback));
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback)
{ {
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>(); auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future(); auto future = promise->get_future();
@ -854,7 +899,21 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
promise->set_value(response); promise->set_value(response);
}; };
impl->list(path, std::move(callback), {}); impl->list(path, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::ListResponse & response) mutable
{
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
return future; return future;
} }
@ -897,7 +956,21 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemove(const std::s
return future; return future;
} }
std::future<Coordination::MultiResponse> ZooKeeper::tryAsyncMulti(const Coordination::Requests & ops) std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemoveNoThrow(const std::string & path, int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::RemoveResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::RemoveResponse & response) mutable
{
promise->set_value(response);
};
impl->remove(path, version, std::move(callback));
return future;
}
std::future<Coordination::MultiResponse> ZooKeeper::asyncTryMultiNoThrow(const Coordination::Requests & ops)
{ {
auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>(); auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>();
auto future = promise->get_future(); auto future = promise->get_future();

View File

@ -39,9 +39,6 @@ constexpr size_t MULTI_BATCH_SIZE = 100;
/// watch notification. /// watch notification.
/// Callback-based watch interface is also provided. /// Callback-based watch interface is also provided.
/// ///
/// Read-only methods retry retry_num times if recoverable errors like OperationTimeout
/// or ConnectionLoss are encountered.
///
/// Modifying methods do not retry, because it leads to problems of the double-delete type. /// Modifying methods do not retry, because it leads to problems of the double-delete type.
/// ///
/// Methods with names not starting at try- raise KeeperException on any error. /// Methods with names not starting at try- raise KeeperException on any error.
@ -220,39 +217,55 @@ public:
/// auto result1 = future1.get(); /// auto result1 = future1.get();
/// auto result2 = future2.get(); /// auto result2 = future2.get();
/// ///
/// Future should not be destroyed before the result is gotten. /// NoThrow versions never throw any exception on future.get(), even on SessionExpired error.
using FutureCreate = std::future<Coordination::CreateResponse>; using FutureCreate = std::future<Coordination::CreateResponse>;
FutureCreate asyncCreate(const std::string & path, const std::string & data, int32_t mode); FutureCreate asyncCreate(const std::string & path, const std::string & data, int32_t mode);
/// Like the previous one but don't throw any exceptions on future.get()
FutureCreate asyncTryCreateNoThrow(const std::string & path, const std::string & data, int32_t mode);
using FutureGet = std::future<Coordination::GetResponse>; using FutureGet = std::future<Coordination::GetResponse>;
FutureGet asyncGet(const std::string & path); FutureGet asyncGet(const std::string & path, Coordination::WatchCallback watch_callback = {});
/// Like the previous one but don't throw any exceptions on future.get()
FutureGet asyncTryGet(const std::string & path); FutureGet asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureExists = std::future<Coordination::ExistsResponse>; using FutureExists = std::future<Coordination::ExistsResponse>;
FutureExists asyncExists(const std::string & path); FutureExists asyncExists(const std::string & path, Coordination::WatchCallback watch_callback = {});
/// Like the previous one but don't throw any exceptions on future.get()
FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureGetChildren = std::future<Coordination::ListResponse>; using FutureGetChildren = std::future<Coordination::ListResponse>;
FutureGetChildren asyncGetChildren(const std::string & path); FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {});
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureSet = std::future<Coordination::SetResponse>; using FutureSet = std::future<Coordination::SetResponse>;
FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1); FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
/// Like the previous one but don't throw any exceptions on future.get()
FutureSet asyncTrySetNoThrow(const std::string & path, const std::string & data, int32_t version = -1);
using FutureRemove = std::future<Coordination::RemoveResponse>; using FutureRemove = std::future<Coordination::RemoveResponse>;
FutureRemove asyncRemove(const std::string & path, int32_t version = -1); FutureRemove asyncRemove(const std::string & path, int32_t version = -1);
/// Like the previous one but don't throw any exceptions on future.get()
FutureRemove asyncTryRemoveNoThrow(const std::string & path, int32_t version = -1);
using FutureMulti = std::future<Coordination::MultiResponse>;
FutureMulti asyncMulti(const Coordination::Requests & ops);
/// Like the previous one but don't throw any exceptions on future.get()
FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops);
/// Very specific methods introduced without following general style. Implements
/// some custom throw/no throw logic on future.get().
///
/// Doesn't throw in the following cases: /// Doesn't throw in the following cases:
/// * The node doesn't exist /// * The node doesn't exist
/// * The versions do not match /// * The versions do not match
/// * The node has children /// * The node has children
FutureRemove asyncTryRemove(const std::string & path, int32_t version = -1); FutureRemove asyncTryRemove(const std::string & path, int32_t version = -1);
using FutureMulti = std::future<Coordination::MultiResponse>; /// Doesn't throw in the following cases:
FutureMulti asyncMulti(const Coordination::Requests & ops); /// * The node doesn't exist
FutureGet asyncTryGet(const std::string & path);
/// Like the previous one but don't throw any exceptions on future.get()
FutureMulti tryAsyncMulti(const Coordination::Requests & ops);
void finalize(); void finalize();
@ -262,7 +275,7 @@ private:
void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_, void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_); int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
/// The following methods don't throw exceptions but return error codes. /// The following methods don't any throw exceptions but return error codes.
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
Coordination::Error removeImpl(const std::string & path, int32_t version); Coordination::Error removeImpl(const std::string & path, int32_t version);
Coordination::Error getImpl( Coordination::Error getImpl(

View File

@ -291,7 +291,7 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
std::vector<zkutil::ZooKeeper::FutureMulti> futures; std::vector<zkutil::ZooKeeper::FutureMulti> futures;
for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) for (size_t i = 0; i < candidate_lost_replicas.size(); ++i)
futures.emplace_back(zookeeper->tryAsyncMulti(requests[i])); futures.emplace_back(zookeeper->asyncTryMultiNoThrow(requests[i]));
for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) for (size_t i = 0; i < candidate_lost_replicas.size(); ++i)
{ {

View File

@ -6162,7 +6162,7 @@ bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(const St
{ {
Coordination::Requests ops; Coordination::Requests ops;
removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops)); remove_futures.emplace_back(zookeeper->asyncTryMultiNoThrow(ops));
} }
} }
@ -6223,7 +6223,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
{ {
Coordination::Requests ops; Coordination::Requests ops;
removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0);
remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops)); remove_futures.emplace_back(zookeeper->asyncTryMultiNoThrow(ops));
} }
else else
{ {