diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index 5e11687eab5..30d816aad15 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -411,7 +411,7 @@ public: * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true, * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another. * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event. - * - data for callbacks must be alive when ZooKeeper instance is alive. + * - data for callbacks must be alive when ZooKeeper instance is alive, so try to avoid capturing references in callbacks, it's error-prone. */ class IKeeper { @@ -428,6 +428,9 @@ public: /// /// After the method is executed successfully, you must wait for callbacks /// (don't destroy callback data before it will be called). + /// TODO: The above line is the description of an error-prone interface. It's better + /// to replace callbacks with std::future results, so the caller shouldn't think about + /// lifetime of the callback data. /// /// All callbacks are executed sequentially (the execution of callbacks is serialized). /// diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index f3aecd4e76b..1ee70b0cc3f 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -240,24 +240,25 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings Coordination::Stat * stat, Coordination::WatchCallback watch_callback) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback); - auto callback = [&](const Coordination::ListResponse & response) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { - SCOPE_EXIT(event.set()); - code = response.error; + impl->finalize(); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; if (code == Coordination::Error::ZOK) { res = response.names; if (stat) *stat = response.stat; } - }; - - impl->list(path, callback, watch_callback); - event.wait(); - return code; + return code; + } } Strings ZooKeeper::getChildren( @@ -300,20 +301,21 @@ Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Str Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryCreateNoThrow(path, data, mode); - auto callback = [&](const Coordination::CreateResponse & response) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { - SCOPE_EXIT(event.set()); - code = response.error; + impl->finalize(); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; if (code == Coordination::Error::ZOK) path_created = response.path_created; - }; - - impl->create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode - event.wait(); - return code; + return code; + } } std::string ZooKeeper::create(const std::string & path, const std::string & data, int32_t mode) @@ -368,19 +370,19 @@ void ZooKeeper::createAncestors(const std::string & path) Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryRemoveNoThrow(path, version); - auto callback = [&](const Coordination::RemoveResponse & response) + + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { - SCOPE_EXIT(event.set()); - if (response.error != Coordination::Error::ZOK) - code = response.error; - }; - - impl->remove(path, version, callback); - event.wait(); - return code; + impl->finalize(); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + return response.error; + } } void ZooKeeper::remove(const std::string & path, int32_t version) @@ -401,26 +403,22 @@ Coordination::Error ZooKeeper::tryRemove(const std::string & path, int32_t versi Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryExistsNoThrow(path, watch_callback); - auto callback = [&](const Coordination::ExistsResponse & response) - { - SCOPE_EXIT(event.set()); - code = response.error; - if (code == Coordination::Error::ZOK && stat) - *stat = response.stat; - }; - - impl->exists(path, callback, watch_callback); - - if (!event.tryWait(operation_timeout_ms)) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { impl->finalize(); return Coordination::Error::ZOPERATIONTIMEOUT; } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; + if (code == Coordination::Error::ZOK && stat) + *stat = response.stat; - return code; + return code; + } } bool ZooKeeper::exists(const std::string & path, Coordination::Stat * stat, const EventPtr & watch) @@ -439,29 +437,25 @@ bool ZooKeeper::existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryGetNoThrow(path, watch_callback); - auto callback = [&](const Coordination::GetResponse & response) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { - SCOPE_EXIT(event.set()); - code = response.error; + impl->finalize(); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; if (code == Coordination::Error::ZOK) { res = response.data; if (stat) *stat = response.stat; } - }; - - impl->get(path, callback, watch_callback); - if (!event.tryWait(operation_timeout_ms)) - { - impl->finalize(); - return Coordination::Error::ZOPERATIONTIMEOUT; + return code; } - - return code; } @@ -516,24 +510,22 @@ bool ZooKeeper::tryGetWatch( Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat) { - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTrySetNoThrow(path, data, version); - auto callback = [&](const Coordination::SetResponse & response) - { - SCOPE_EXIT(event.set()); - code = response.error; - if (code == Coordination::Error::ZOK && stat) - *stat = response.stat; - }; - - impl->set(path, data, version, callback); - if (!event.tryWait(operation_timeout_ms)) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { impl->finalize(); return Coordination::Error::ZOPERATIONTIMEOUT; } - return code; + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; + if (code == Coordination::Error::ZOK && stat) + *stat = response.stat; + + return code; + } } void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat) @@ -570,23 +562,20 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests if (requests.empty()) return Coordination::Error::ZOK; - Coordination::Error code = Coordination::Error::ZOK; - Poco::Event event; + auto future_result = asyncTryMultiNoThrow(requests); - auto callback = [&](const Coordination::MultiResponse & response) - { - SCOPE_EXIT(event.set()); - code = response.error; - responses = response.responses; - }; - - impl->multi(requests, callback); - if (!event.tryWait(operation_timeout_ms)) + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) { impl->finalize(); return Coordination::Error::ZOPERATIONTIMEOUT; } - return code; + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; + responses = response.responses; + return code; + } } Coordination::Responses ZooKeeper::multi(const Coordination::Requests & requests) @@ -771,8 +760,21 @@ std::future ZooKeeper::asyncCreate(const std::stri return future; } +std::future ZooKeeper::asyncTryCreateNoThrow(const std::string & path, const std::string & data, int32_t mode) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); -std::future ZooKeeper::asyncGet(const std::string & path) + auto callback = [promise](const Coordination::CreateResponse & response) mutable + { + promise->set_value(response); + }; + + impl->create(path, data, mode & 1, mode & 2, {}, std::move(callback)); + return future; +} + +std::future ZooKeeper::asyncGet(const std::string & path, Coordination::WatchCallback watch_callback) { auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -785,7 +787,21 @@ std::future ZooKeeper::asyncGet(const std::string & p promise->set_value(response); }; - impl->get(path, std::move(callback), {}); + impl->get(path, std::move(callback), watch_callback); + return future; +} + +std::future ZooKeeper::asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::GetResponse & response) mutable + { + promise->set_value(response); + }; + + impl->get(path, std::move(callback), watch_callback); return future; } @@ -807,7 +823,7 @@ std::future ZooKeeper::asyncTryGet(const std::string return future; } -std::future ZooKeeper::asyncExists(const std::string & path) +std::future ZooKeeper::asyncExists(const std::string & path, Coordination::WatchCallback watch_callback) { auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -820,7 +836,21 @@ std::future ZooKeeper::asyncExists(const std::stri promise->set_value(response); }; - impl->exists(path, std::move(callback), {}); + impl->exists(path, std::move(callback), watch_callback); + return future; +} + +std::future ZooKeeper::asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::ExistsResponse & response) mutable + { + promise->set_value(response); + }; + + impl->exists(path, std::move(callback), watch_callback); return future; } @@ -841,7 +871,22 @@ std::future ZooKeeper::asyncSet(const std::string & p return future; } -std::future ZooKeeper::asyncGetChildren(const std::string & path) + +std::future ZooKeeper::asyncTrySetNoThrow(const std::string & path, const std::string & data, int32_t version) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::SetResponse & response) mutable + { + promise->set_value(response); + }; + + impl->set(path, data, version, std::move(callback)); + return future; +} + +std::future ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback) { auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -854,7 +899,21 @@ std::future ZooKeeper::asyncGetChildren(const std::s promise->set_value(response); }; - impl->list(path, std::move(callback), {}); + impl->list(path, std::move(callback), watch_callback); + return future; +} + +std::future ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::ListResponse & response) mutable + { + promise->set_value(response); + }; + + impl->list(path, std::move(callback), watch_callback); return future; } @@ -897,7 +956,21 @@ std::future ZooKeeper::asyncTryRemove(const std::s return future; } -std::future ZooKeeper::tryAsyncMulti(const Coordination::Requests & ops) +std::future ZooKeeper::asyncTryRemoveNoThrow(const std::string & path, int32_t version) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::RemoveResponse & response) mutable + { + promise->set_value(response); + }; + + impl->remove(path, version, std::move(callback)); + return future; +} + +std::future ZooKeeper::asyncTryMultiNoThrow(const Coordination::Requests & ops) { auto promise = std::make_shared>(); auto future = promise->get_future(); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 4a65ff070f7..7aafee52bf0 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -39,9 +39,6 @@ constexpr size_t MULTI_BATCH_SIZE = 100; /// watch notification. /// Callback-based watch interface is also provided. /// -/// Read-only methods retry retry_num times if recoverable errors like OperationTimeout -/// or ConnectionLoss are encountered. -/// /// Modifying methods do not retry, because it leads to problems of the double-delete type. /// /// Methods with names not starting at try- raise KeeperException on any error. @@ -220,39 +217,55 @@ public: /// auto result1 = future1.get(); /// auto result2 = future2.get(); /// - /// Future should not be destroyed before the result is gotten. + /// NoThrow versions never throw any exception on future.get(), even on SessionExpired error. using FutureCreate = std::future; FutureCreate asyncCreate(const std::string & path, const std::string & data, int32_t mode); + /// Like the previous one but don't throw any exceptions on future.get() + FutureCreate asyncTryCreateNoThrow(const std::string & path, const std::string & data, int32_t mode); using FutureGet = std::future; - FutureGet asyncGet(const std::string & path); - - FutureGet asyncTryGet(const std::string & path); + FutureGet asyncGet(const std::string & path, Coordination::WatchCallback watch_callback = {}); + /// Like the previous one but don't throw any exceptions on future.get() + FutureGet asyncTryGetNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {}); using FutureExists = std::future; - FutureExists asyncExists(const std::string & path); + FutureExists asyncExists(const std::string & path, Coordination::WatchCallback watch_callback = {}); + /// Like the previous one but don't throw any exceptions on future.get() + FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {}); using FutureGetChildren = std::future; - FutureGetChildren asyncGetChildren(const std::string & path); + FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {}); + /// Like the previous one but don't throw any exceptions on future.get() + FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {}); using FutureSet = std::future; FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1); + /// Like the previous one but don't throw any exceptions on future.get() + FutureSet asyncTrySetNoThrow(const std::string & path, const std::string & data, int32_t version = -1); using FutureRemove = std::future; FutureRemove asyncRemove(const std::string & path, int32_t version = -1); + /// Like the previous one but don't throw any exceptions on future.get() + FutureRemove asyncTryRemoveNoThrow(const std::string & path, int32_t version = -1); + using FutureMulti = std::future; + FutureMulti asyncMulti(const Coordination::Requests & ops); + /// Like the previous one but don't throw any exceptions on future.get() + FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops); + + /// Very specific methods introduced without following general style. Implements + /// some custom throw/no throw logic on future.get(). + /// /// Doesn't throw in the following cases: /// * The node doesn't exist /// * The versions do not match /// * The node has children FutureRemove asyncTryRemove(const std::string & path, int32_t version = -1); - using FutureMulti = std::future; - FutureMulti asyncMulti(const Coordination::Requests & ops); - - /// Like the previous one but don't throw any exceptions on future.get() - FutureMulti tryAsyncMulti(const Coordination::Requests & ops); + /// Doesn't throw in the following cases: + /// * The node doesn't exist + FutureGet asyncTryGet(const std::string & path); void finalize(); @@ -262,7 +275,7 @@ private: void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_); - /// The following methods don't throw exceptions but return error codes. + /// The following methods don't any throw exceptions but return error codes. Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); Coordination::Error removeImpl(const std::string & path, int32_t version); Coordination::Error getImpl( diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index d3496d99cef..10e2d77eb27 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -291,7 +291,7 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map std::vector futures; for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) - futures.emplace_back(zookeeper->tryAsyncMulti(requests[i])); + futures.emplace_back(zookeeper->asyncTryMultiNoThrow(requests[i])); for (size_t i = 0; i < candidate_lost_replicas.size(); ++i) { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3702cd0ce21..62f1294cc96 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6162,7 +6162,7 @@ bool StorageReplicatedMergeTree::tryRemovePartsFromZooKeeperWithRetries(const St { Coordination::Requests ops; removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); - remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops)); + remove_futures.emplace_back(zookeeper->asyncTryMultiNoThrow(ops)); } } @@ -6223,7 +6223,7 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper( { Coordination::Requests ops; removePartFromZooKeeper(part_names[i], ops, exists_resp.stat.numChildren > 0); - remove_futures.emplace_back(zookeeper->tryAsyncMulti(ops)); + remove_futures.emplace_back(zookeeper->asyncTryMultiNoThrow(ops)); } else {