diff --git a/src/Storages/NATS/NATSSettings.h b/src/Storages/NATS/NATSSettings.h index 6029aaea9f6..9bf9b969387 100644 --- a/src/Storages/NATS/NATSSettings.h +++ b/src/Storages/NATS/NATSSettings.h @@ -24,7 +24,8 @@ class ASTStorage; M(Milliseconds, nats_flush_interval_ms, 0, "Timeout for flushing data from NATS.", 0) \ M(String, nats_username, "", "NATS username", 0) \ M(String, nats_password, "", "NATS password", 0) \ - M(String, nats_token, "", "NATS token", 0) + M(String, nats_token, "", "NATS token", 0) \ + M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \ #define LIST_OF_NATS_SETTINGS(M) \ NATS_RELATED_SETTINGS(M) \ diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index fc3079a7aa7..4a3ba973e67 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -92,10 +92,24 @@ StorageNATS::StorageNATS( try { - connection = std::make_shared(configuration, log); - if (!connection->connect()) - throw Exception(ErrorCodes::CANNOT_CONNECT_NATS, "Cannot connect to {}. Nats last error: {}", - connection->connectionInfoForLog(), nats_GetLastError(nullptr)); + size_t num_tries = nats_settings->nats_startup_connect_tries; + for (size_t i = 0; i < num_tries; ++i) + { + connection = std::make_shared(configuration, log); + + if (connection->connect()) + break; + + if (i == num_tries - 1) + { + throw Exception( + ErrorCodes::CANNOT_CONNECT_NATS, + "Cannot connect to {}. Nats last error: {}", + connection->connectionInfoForLog(), nats_GetLastError(nullptr)); + } + + LOG_DEBUG(log, "Connect attempt #{} failed, error: {}. Reconnecting...", i + 1, nats_GetLastError(nullptr)); + } } catch (...) {