mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge 6b7fa46310
into a3fe155579
This commit is contained in:
commit
3e852eecc5
@ -97,6 +97,10 @@ void NATSConnectionManager::connectImpl()
|
||||
if (configuration.secure)
|
||||
{
|
||||
natsOptions_SetSecure(options, true);
|
||||
if (!configuration.ca_file.empty())
|
||||
natsOptions_LoadCATrustedCertificates(options, configuration.ca_file.c_str());
|
||||
if (!configuration.client_cert_file.empty() && !configuration.client_key_file.empty())
|
||||
natsOptions_LoadCertificatesChain(options, configuration.client_cert_file.c_str(), configuration.client_key_file.c_str());
|
||||
}
|
||||
if (skip_verification)
|
||||
{
|
||||
|
@ -15,6 +15,9 @@ struct NATSConfiguration
|
||||
String password;
|
||||
String token;
|
||||
String credential_file;
|
||||
String ca_file;
|
||||
String client_cert_file;
|
||||
String client_key_file;
|
||||
|
||||
int max_reconnect;
|
||||
int reconnect_wait;
|
||||
|
@ -26,6 +26,9 @@ class ASTStorage;
|
||||
M(String, nats_password, "", "NATS password", 0) \
|
||||
M(String, nats_token, "", "NATS token", 0) \
|
||||
M(String, nats_credential_file, "", "Path to a NATS credentials file", 0) \
|
||||
M(String, nats_ca_file, "", "Path to a NATS root ca file", 0) \
|
||||
M(String, nats_client_cert_file, "", "Path to a NATS client certificate file", 0) \
|
||||
M(String, nats_client_key_file, "", "Path to a NATS client key file", 0) \
|
||||
M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \
|
||||
M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
|
||||
M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
|
||||
|
@ -69,6 +69,9 @@ StorageNATS::StorageNATS(
|
||||
auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password);
|
||||
auto nats_token = getContext()->getMacros()->expand(nats_settings->nats_token);
|
||||
auto nats_credential_file = getContext()->getMacros()->expand(nats_settings->nats_credential_file);
|
||||
auto nats_ca_file = getContext()->getMacros()->expand(nats_settings->nats_ca_file);
|
||||
auto nats_client_cert_file = getContext()->getMacros()->expand(nats_settings->nats_client_cert_file);
|
||||
auto nats_client_key_file = getContext()->getMacros()->expand(nats_settings->nats_client_key_file);
|
||||
|
||||
configuration =
|
||||
{
|
||||
@ -78,6 +81,9 @@ StorageNATS::StorageNATS(
|
||||
.password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password,
|
||||
.token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token,
|
||||
.credential_file = nats_credential_file.empty() ? getContext()->getConfigRef().getString("nats.credential_file", "") : nats_credential_file,
|
||||
.ca_file = nats_ca_file.empty() ? getContext()->getConfigRef().getString("nats.ca_file", "") : nats_ca_file,
|
||||
.client_cert_file = nats_client_cert_file.empty() ? getContext()->getConfigRef().getString("nats.client_cert_file", "") : nats_client_cert_file,
|
||||
.client_key_file = nats_client_key_file.empty() ? getContext()->getConfigRef().getString("nats.client_key_file", "") : nats_client_key_file,
|
||||
.max_reconnect = static_cast<int>(nats_settings->nats_max_reconnect.value),
|
||||
.reconnect_wait = static_cast<int>(nats_settings->nats_reconnect_wait.value),
|
||||
.secure = nats_settings->nats_secure.value
|
||||
|
Loading…
Reference in New Issue
Block a user