Retry disconnects and expired sessions

This commit is contained in:
Alexander Gololobov 2024-01-30 17:47:11 +01:00
parent 5816424c2b
commit 023b8cbd53

View File

@ -424,9 +424,35 @@ void ReadFromSystemZooKeeper::applyFilters()
paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper);
}
/// Executes a request to Keeper and retries it in case of expired sessions and disconnects
template <typename Result, typename Operation>
static Result runWithReconnects(Operation && operation, ContextPtr context, QueryStatusPtr query_status)
{
constexpr int max_retries = 20; /// Limit retries by some reasonable number to avoid infinite loops
for (int attempt = 0; ; ++attempt)
{
if (query_status)
query_status->checkTimeLimit();
zkutil::ZooKeeperPtr keeper = context->getZooKeeper();
try
{
return operation(keeper);
}
catch (const Coordination::Exception & e)
{
if (!Coordination::isHardwareError(e.code) ||
attempt >= max_retries ||
e.code == Coordination::Error::ZOPERATIONTIMEOUT)
throw;
}
}
}
void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
QueryStatusPtr query_status = context->getProcessListElement();
if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -448,6 +474,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
std::unordered_set<String> added;
while (!paths.empty())
{
if (query_status)
query_status->checkTimeLimit();
list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
@ -470,7 +499,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task));
}
auto list_responses = zookeeper->tryGetChildren(paths_to_list);
auto list_responses = runWithReconnects<zkutil::ZooKeeper::MultiTryGetChildrenResponse>(
[&paths_to_list](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGetChildren(paths_to_list); },
context, query_status);
struct GetTask
{
@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
}
}
auto get_responses = zookeeper->tryGet(paths_to_get);
auto get_responses = runWithReconnects<zkutil::ZooKeeper::MultiTryGetResponse>(
[&paths_to_get](zkutil::ZooKeeperPtr zookeeper) { return zookeeper->tryGet(paths_to_get); },
context, query_status);
for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{