diff --git a/docs/en/engines/table-engines/integrations/nats.md b/docs/en/engines/table-engines/integrations/nats.md index e898d1f1b82..9f7409a6893 100644 --- a/docs/en/engines/table-engines/integrations/nats.md +++ b/docs/en/engines/table-engines/integrations/nats.md @@ -38,6 +38,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] [nats_username = 'user',] [nats_password = 'password',] [nats_token = 'clickhouse',] + [nats_credential_file = '/var/nats_credentials',] [nats_startup_connect_tries = '5'] [nats_max_rows_per_message = 1,] [nats_handle_error_mode = 'default'] @@ -63,6 +64,7 @@ Optional parameters: - `nats_username` - NATS username. - `nats_password` - NATS password. - `nats_token` - NATS auth token. +- `nats_credential_file` - Path to a NATS credentials file. - `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`. - `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`). - `nats_handle_error_mode` — How to handle errors for RabbitMQ engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). diff --git a/src/Storages/NATS/NATSConnection.cpp b/src/Storages/NATS/NATSConnection.cpp index d7ad0cf8219..4d30d6b2360 100644 --- a/src/Storages/NATS/NATSConnection.cpp +++ b/src/Storages/NATS/NATSConnection.cpp @@ -91,6 +91,8 @@ void NATSConnectionManager::connectImpl() natsOptions_SetUserInfo(options, configuration.username.c_str(), configuration.password.c_str()); if (!configuration.token.empty()) natsOptions_SetToken(options, configuration.token.c_str()); + if (!configuration.credential_file.empty()) + natsOptions_SetUserCredentialsFromFiles(options, configuration.credential_file.c_str(), nullptr); if (configuration.secure) { diff --git a/src/Storages/NATS/NATSConnection.h b/src/Storages/NATS/NATSConnection.h index c350f395a92..859fcb72022 100644 --- a/src/Storages/NATS/NATSConnection.h +++ b/src/Storages/NATS/NATSConnection.h @@ -14,6 +14,7 @@ struct NATSConfiguration String username; String password; String token; + String credential_file; int max_reconnect; int reconnect_wait; diff --git a/src/Storages/NATS/NATSSettings.h b/src/Storages/NATS/NATSSettings.h index 3e3ed739d82..3273a5ff065 100644 --- a/src/Storages/NATS/NATSSettings.h +++ b/src/Storages/NATS/NATSSettings.h @@ -25,6 +25,7 @@ class ASTStorage; M(String, nats_username, "", "NATS username", 0) \ M(String, nats_password, "", "NATS password", 0) \ M(String, nats_token, "", "NATS token", 0) \ + M(String, nats_credential_file, "", "Path to a NATS credentials file", 0) \ M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \ M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index 2af9a9f974f..4b6ff1d8f2a 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -67,6 +67,7 @@ StorageNATS::StorageNATS( auto nats_username = getContext()->getMacros()->expand(nats_settings->nats_username); auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password); auto nats_token = getContext()->getMacros()->expand(nats_settings->nats_token); + auto nats_credential_file = getContext()->getMacros()->expand(nats_settings->nats_credential_file); configuration = { @@ -75,6 +76,7 @@ StorageNATS::StorageNATS( .username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.user", "") : nats_username, .password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password, .token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token, + .credential_file = nats_credential_file.empty() ? getContext()->getConfigRef().getString("nats.credential_file", "") : nats_credential_file, .max_reconnect = static_cast(nats_settings->nats_max_reconnect.value), .reconnect_wait = static_cast(nats_settings->nats_reconnect_wait.value), .secure = nats_settings->nats_secure.value