diff --git a/src/Storages/NATS/NATSConnection.cpp b/src/Storages/NATS/NATSConnection.cpp index 4d30d6b2360..d8ce05160df 100644 --- a/src/Storages/NATS/NATSConnection.cpp +++ b/src/Storages/NATS/NATSConnection.cpp @@ -97,6 +97,10 @@ void NATSConnectionManager::connectImpl() if (configuration.secure) { natsOptions_SetSecure(options, true); + if (!configuration.ca_file.empty()) + natsOptions_LoadCATrustedCertificates(options, configuration.ca_file.c_str()); + if (!configuration.client_cert_file.empty() && !configuration.client_key_file.empty()) + natsOptions_LoadCertificatesChain(options, configuration.client_cert_file.c_str(), configuration.client_key_file.c_str()); } if (skip_verification) { diff --git a/src/Storages/NATS/NATSConnection.h b/src/Storages/NATS/NATSConnection.h index 859fcb72022..e57a9659be4 100644 --- a/src/Storages/NATS/NATSConnection.h +++ b/src/Storages/NATS/NATSConnection.h @@ -15,6 +15,9 @@ struct NATSConfiguration String password; String token; String credential_file; + String ca_file; + String client_cert_file; + String client_key_file; int max_reconnect; int reconnect_wait; diff --git a/src/Storages/NATS/NATSSettings.h b/src/Storages/NATS/NATSSettings.h index 3273a5ff065..0b0125084a1 100644 --- a/src/Storages/NATS/NATSSettings.h +++ b/src/Storages/NATS/NATSSettings.h @@ -26,6 +26,9 @@ class ASTStorage; M(String, nats_password, "", "NATS password", 0) \ M(String, nats_token, "", "NATS token", 0) \ M(String, nats_credential_file, "", "Path to a NATS credentials file", 0) \ + M(String, nats_ca_file, "", "Path to a NATS root ca file", 0) \ + M(String, nats_client_cert_file, "", "Path to a NATS client certificate file", 0) \ + M(String, nats_client_key_file, "", "Path to a NATS client key file", 0) \ M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \ M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index 9d728c3395f..a75f7d82029 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -69,6 +69,9 @@ StorageNATS::StorageNATS( auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password); auto nats_token = getContext()->getMacros()->expand(nats_settings->nats_token); auto nats_credential_file = getContext()->getMacros()->expand(nats_settings->nats_credential_file); + auto nats_ca_file = getContext()->getMacros()->expand(nats_settings->nats_ca_file); + auto nats_client_cert_file = getContext()->getMacros()->expand(nats_settings->nats_client_cert_file); + auto nats_client_key_file = getContext()->getMacros()->expand(nats_settings->nats_client_key_file); configuration = { @@ -78,6 +81,9 @@ StorageNATS::StorageNATS( .password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password, .token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token, .credential_file = nats_credential_file.empty() ? getContext()->getConfigRef().getString("nats.credential_file", "") : nats_credential_file, + .ca_file = nats_ca_file.empty() ? getContext()->getConfigRef().getString("nats.ca_file", "") : nats_ca_file, + .client_cert_file = nats_client_cert_file.empty() ? getContext()->getConfigRef().getString("nats.client_cert_file", "") : nats_client_cert_file, + .client_key_file = nats_client_key_file.empty() ? getContext()->getConfigRef().getString("nats.client_key_file", "") : nats_client_key_file, .max_reconnect = static_cast(nats_settings->nats_max_reconnect.value), .reconnect_wait = static_cast(nats_settings->nats_reconnect_wait.value), .secure = nats_settings->nats_secure.value