mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #59543 from nickolaj-jepsen/master
Add support for NATS credentials file
This commit is contained in:
commit
83515e3656
@ -38,6 +38,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
|||||||
[nats_username = 'user',]
|
[nats_username = 'user',]
|
||||||
[nats_password = 'password',]
|
[nats_password = 'password',]
|
||||||
[nats_token = 'clickhouse',]
|
[nats_token = 'clickhouse',]
|
||||||
|
[nats_credential_file = '/var/nats_credentials',]
|
||||||
[nats_startup_connect_tries = '5']
|
[nats_startup_connect_tries = '5']
|
||||||
[nats_max_rows_per_message = 1,]
|
[nats_max_rows_per_message = 1,]
|
||||||
[nats_handle_error_mode = 'default']
|
[nats_handle_error_mode = 'default']
|
||||||
@ -63,6 +64,7 @@ Optional parameters:
|
|||||||
- `nats_username` - NATS username.
|
- `nats_username` - NATS username.
|
||||||
- `nats_password` - NATS password.
|
- `nats_password` - NATS password.
|
||||||
- `nats_token` - NATS auth token.
|
- `nats_token` - NATS auth token.
|
||||||
|
- `nats_credential_file` - Path to a NATS credentials file.
|
||||||
- `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`.
|
- `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`.
|
||||||
- `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`).
|
- `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`).
|
||||||
- `nats_handle_error_mode` — How to handle errors for RabbitMQ engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`).
|
- `nats_handle_error_mode` — How to handle errors for RabbitMQ engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`).
|
||||||
|
@ -91,6 +91,8 @@ void NATSConnectionManager::connectImpl()
|
|||||||
natsOptions_SetUserInfo(options, configuration.username.c_str(), configuration.password.c_str());
|
natsOptions_SetUserInfo(options, configuration.username.c_str(), configuration.password.c_str());
|
||||||
if (!configuration.token.empty())
|
if (!configuration.token.empty())
|
||||||
natsOptions_SetToken(options, configuration.token.c_str());
|
natsOptions_SetToken(options, configuration.token.c_str());
|
||||||
|
if (!configuration.credential_file.empty())
|
||||||
|
natsOptions_SetUserCredentialsFromFiles(options, configuration.credential_file.c_str(), nullptr);
|
||||||
|
|
||||||
if (configuration.secure)
|
if (configuration.secure)
|
||||||
{
|
{
|
||||||
|
@ -14,6 +14,7 @@ struct NATSConfiguration
|
|||||||
String username;
|
String username;
|
||||||
String password;
|
String password;
|
||||||
String token;
|
String token;
|
||||||
|
String credential_file;
|
||||||
|
|
||||||
int max_reconnect;
|
int max_reconnect;
|
||||||
int reconnect_wait;
|
int reconnect_wait;
|
||||||
|
@ -25,6 +25,7 @@ class ASTStorage;
|
|||||||
M(String, nats_username, "", "NATS username", 0) \
|
M(String, nats_username, "", "NATS username", 0) \
|
||||||
M(String, nats_password, "", "NATS password", 0) \
|
M(String, nats_password, "", "NATS password", 0) \
|
||||||
M(String, nats_token, "", "NATS token", 0) \
|
M(String, nats_token, "", "NATS token", 0) \
|
||||||
|
M(String, nats_credential_file, "", "Path to a NATS credentials file", 0) \
|
||||||
M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \
|
M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \
|
||||||
M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
|
M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
|
||||||
M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
|
M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
|
||||||
|
@ -67,6 +67,7 @@ StorageNATS::StorageNATS(
|
|||||||
auto nats_username = getContext()->getMacros()->expand(nats_settings->nats_username);
|
auto nats_username = getContext()->getMacros()->expand(nats_settings->nats_username);
|
||||||
auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password);
|
auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password);
|
||||||
auto nats_token = getContext()->getMacros()->expand(nats_settings->nats_token);
|
auto nats_token = getContext()->getMacros()->expand(nats_settings->nats_token);
|
||||||
|
auto nats_credential_file = getContext()->getMacros()->expand(nats_settings->nats_credential_file);
|
||||||
|
|
||||||
configuration =
|
configuration =
|
||||||
{
|
{
|
||||||
@ -75,6 +76,7 @@ StorageNATS::StorageNATS(
|
|||||||
.username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.user", "") : nats_username,
|
.username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.user", "") : nats_username,
|
||||||
.password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password,
|
.password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password,
|
||||||
.token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token,
|
.token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token,
|
||||||
|
.credential_file = nats_credential_file.empty() ? getContext()->getConfigRef().getString("nats.credential_file", "") : nats_credential_file,
|
||||||
.max_reconnect = static_cast<int>(nats_settings->nats_max_reconnect.value),
|
.max_reconnect = static_cast<int>(nats_settings->nats_max_reconnect.value),
|
||||||
.reconnect_wait = static_cast<int>(nats_settings->nats_reconnect_wait.value),
|
.reconnect_wait = static_cast<int>(nats_settings->nats_reconnect_wait.value),
|
||||||
.secure = nats_settings->nats_secure.value
|
.secure = nats_settings->nats_secure.value
|
||||||
|
Loading…
Reference in New Issue
Block a user