From ce7b8aefd274cef40002a52895bd434225a36ddc Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 7 Oct 2020 11:41:27 +0300 Subject: [PATCH 1/2] Add reconnects to zookeeper-dump-tree tool --- utils/zookeeper-dump-tree/main.cpp | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/utils/zookeeper-dump-tree/main.cpp b/utils/zookeeper-dump-tree/main.cpp index 877a4a996b7..4614ee577e7 100644 --- a/utils/zookeeper-dump-tree/main.cpp +++ b/utils/zookeeper-dump-tree/main.cpp @@ -29,16 +29,20 @@ try return 1; } - zkutil::ZooKeeper zookeeper(options.at("address").as()); + zkutil::ZooKeeperPtr zookeeper = std::make_shared(options.at("address").as()); std::string initial_path = options.at("path").as(); std::list>> list_futures; - list_futures.emplace_back(initial_path, zookeeper.asyncGetChildren(initial_path)); + list_futures.emplace_back(initial_path, zookeeper->asyncGetChildren(initial_path)); + + size_t num_reconnects = 0; + constexpr size_t max_reconnects = 100; for (auto it = list_futures.begin(); it != list_futures.end(); ++it) { Coordination::ListResponse response; + try { response = it->second.get(); @@ -46,8 +50,25 @@ try catch (const Coordination::Exception & e) { if (e.code == Coordination::Error::ZNONODE) + { continue; - throw; + } + else if (Coordination::isHardwareError(e.code)) + { + /// Reinitialize the session and move the node to the end of the queue for later retry. + if (zookeeper->expired()) + { + if (num_reconnects == max_reconnects) + throw; + ++num_reconnects; + zookeeper = zookeeper->startNewSession(); + } + + list_futures.emplace_back(it->first, zookeeper->asyncGetChildren(it->first)); + continue; + } + else + throw; } std::cout << it->first << '\t' << response.stat.numChildren << '\t' << response.stat.dataLength << '\n'; @@ -55,7 +76,7 @@ try for (const auto & name : response.names) { std::string child_path = it->first == "/" ? it->first + name : it->first + '/' + name; - list_futures.emplace_back(child_path, zookeeper.asyncGetChildren(child_path)); + list_futures.emplace_back(child_path, zookeeper->asyncGetChildren(child_path)); } } } From 7b39906d6aa0b21236d3997078660343dcb86039 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 9 Oct 2020 22:15:51 +0300 Subject: [PATCH 2/2] Miscellanous --- utils/zookeeper-dump-tree/main.cpp | 159 ++++++++++++++++------------- 1 file changed, 86 insertions(+), 73 deletions(-) diff --git a/utils/zookeeper-dump-tree/main.cpp b/utils/zookeeper-dump-tree/main.cpp index 4614ee577e7..b50d6e97235 100644 --- a/utils/zookeeper-dump-tree/main.cpp +++ b/utils/zookeeper-dump-tree/main.cpp @@ -7,81 +7,94 @@ int main(int argc, char ** argv) -try { - boost::program_options::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("address,a", boost::program_options::value()->required(), - "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") - ("path,p", boost::program_options::value()->default_value("/"), - "where to start") - ; - - boost::program_options::variables_map options; - boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); - - if (options.count("help")) + try { - std::cout << "Dump paths of all nodes in ZooKeeper." << std::endl; - std::cout << "Usage: " << argv[0] << " [options]" << std::endl; - std::cout << desc << std::endl; + boost::program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("address,a", boost::program_options::value()->required(), + "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") + ("path,p", boost::program_options::value()->default_value("/"), + "where to start") + ; + + boost::program_options::variables_map options; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); + + if (options.count("help")) + { + std::cout << "Dump paths of all nodes in ZooKeeper." << std::endl; + std::cout << "Usage: " << argv[0] << " [options]" << std::endl; + std::cout << desc << std::endl; + return 1; + } + + zkutil::ZooKeeperPtr zookeeper = std::make_shared(options.at("address").as()); + + std::string initial_path = options.at("path").as(); + + std::list>> list_futures; + list_futures.emplace_back(initial_path, zookeeper->asyncGetChildren(initial_path)); + + size_t num_reconnects = 0; + constexpr size_t max_reconnects = 100; + + auto ensureSession = [&] + { + if (zookeeper->expired()) + { + if (num_reconnects == max_reconnects) + return false; + ++num_reconnects; + std::cerr << "num_reconnects: " << num_reconnects << "\n"; + zookeeper = zookeeper->startNewSession(); + } + return true; + }; + + for (auto it = list_futures.begin(); it != list_futures.end(); ++it) + { + Coordination::ListResponse response; + + try + { + response = it->second.get(); + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::Error::ZNONODE) + { + continue; + } + else if (Coordination::isHardwareError(e.code)) + { + /// Reinitialize the session and move the node to the end of the queue for later retry. + if (!ensureSession()) + throw; + list_futures.emplace_back(it->first, zookeeper->asyncGetChildren(it->first)); + continue; + } + else + throw; + } + + std::cout << it->first << '\t' << response.stat.numChildren << '\t' << response.stat.dataLength << '\n'; + + for (const auto & name : response.names) + { + std::string child_path = it->first == "/" ? it->first + name : it->first + '/' + name; + + ensureSession(); + list_futures.emplace_back(child_path, zookeeper->asyncGetChildren(child_path)); + } + } + + return 0; + } + catch (...) + { + std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; return 1; } - - zkutil::ZooKeeperPtr zookeeper = std::make_shared(options.at("address").as()); - - std::string initial_path = options.at("path").as(); - - std::list>> list_futures; - list_futures.emplace_back(initial_path, zookeeper->asyncGetChildren(initial_path)); - - size_t num_reconnects = 0; - constexpr size_t max_reconnects = 100; - - for (auto it = list_futures.begin(); it != list_futures.end(); ++it) - { - Coordination::ListResponse response; - - try - { - response = it->second.get(); - } - catch (const Coordination::Exception & e) - { - if (e.code == Coordination::Error::ZNONODE) - { - continue; - } - else if (Coordination::isHardwareError(e.code)) - { - /// Reinitialize the session and move the node to the end of the queue for later retry. - if (zookeeper->expired()) - { - if (num_reconnects == max_reconnects) - throw; - ++num_reconnects; - zookeeper = zookeeper->startNewSession(); - } - - list_futures.emplace_back(it->first, zookeeper->asyncGetChildren(it->first)); - continue; - } - else - throw; - } - - std::cout << it->first << '\t' << response.stat.numChildren << '\t' << response.stat.dataLength << '\n'; - - for (const auto & name : response.names) - { - std::string child_path = it->first == "/" ? it->first + name : it->first + '/' + name; - list_futures.emplace_back(child_path, zookeeper->asyncGetChildren(child_path)); - } - } -} -catch (...) -{ - std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; - throw; }