From 023b8cbd53c1d3788e97d17b0329e3330c1cc0eb Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 30 Jan 2024 17:47:11 +0100 Subject: [PATCH] Retry disconnects and expired sessions --- .../System/StorageSystemZooKeeper.cpp | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/Storages/System/StorageSystemZooKeeper.cpp b/src/Storages/System/StorageSystemZooKeeper.cpp index 37fe9074950..9a671f08138 100644 --- a/src/Storages/System/StorageSystemZooKeeper.cpp +++ b/src/Storages/System/StorageSystemZooKeeper.cpp @@ -424,9 +424,35 @@ void ReadFromSystemZooKeeper::applyFilters() paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper); } +/// Executes a request to Keeper and retries it in case of expired sessions and disconnects +template +static Result runWithReconnects(Operation && operation, ContextPtr context, QueryStatusPtr query_status) +{ + constexpr int max_retries = 20; /// Limit retries by some reasonable number to avoid infinite loops + for (int attempt = 0; ; ++attempt) + { + if (query_status) + query_status->checkTimeLimit(); + + zkutil::ZooKeeperPtr keeper = context->getZooKeeper(); + + try + { + return operation(keeper); + } + catch (const Coordination::Exception & e) + { + if (!Coordination::isHardwareError(e.code) || + attempt >= max_retries || + e.code == Coordination::Error::ZOPERATIONTIMEOUT) + throw; + } + } +} + void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) { - zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper(); + QueryStatusPtr query_status = context->getProcessListElement(); if (paths.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -448,6 +474,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) std::unordered_set added; while (!paths.empty()) { + if (query_status) + query_status->checkTimeLimit(); + list_tasks.clear(); std::vector paths_to_list; while (!paths.empty() && static_cast(list_tasks.size()) < max_inflight_requests) @@ -470,7 +499,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) paths_to_list.emplace_back(task.path_corrected); list_tasks.emplace_back(std::move(task)); } - auto list_responses = zookeeper->tryGetChildren(paths_to_list); + auto list_responses = runWithReconnects( + [&paths_to_list](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGetChildren(paths_to_list); }, + context, query_status); struct GetTask { @@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) } } - auto get_responses = zookeeper->tryGet(paths_to_get); + auto get_responses = runWithReconnects( + [&paths_to_get](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGet(paths_to_get); }, + context, query_status); for (size_t i = 0, size = get_tasks.size(); i < size; ++i) {