NATS. Add new setting to doc; clean up code.

This commit is contained in:
tchepavel 2022-05-23 14:57:39 +03:00
parent d9436ec7dd
commit 87217ce6cc
2 changed files with 38 additions and 10 deletions

View File

@ -67,6 +67,26 @@ SSL connection:
For secure connection use `nats_secure = 1`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.
Writing to NATS table:
If table reads only from one subject, any insert will publish to the same subject.
However, if table reads from multiple subjects, we need to specify which subject we want to publish to.
That is why whenever inserting into table with multiple subjects, setting `stream_like_engine_insert_queue` is needed.
You can select one of the subjects the table reads from and publish your data there. For example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1,subject2',
nats_format = 'JSONEachRow';
INSERT INTO queue
SETTINGS stream_like_engine_insert_queue = 'subject2'
VALUES (1, 1);
```
Also format settings can be added along with nats-related settings.
@ -79,7 +99,7 @@ Example:
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subject = 'subject1',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
```
@ -114,7 +134,7 @@ Example:
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subject = 'subject1',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';

View File

@ -125,18 +125,26 @@ void WriteBufferToNATSProducer::publishThreadFunc(void * arg)
void WriteBufferToNATSProducer::writingFunc()
{
while ((!payloads.empty() || wait_all) && !shutdown_called.load())
try
{
publish();
while ((!payloads.empty() || wait_all) && !shutdown_called.load())
{
publish();
LOG_DEBUG(log, "Writing func {} {} {}", wait_payloads.load(), payloads.empty(), natsConnection_Buffered(connection.getConnection()));
if (wait_payloads.load() && payloads.empty() && natsConnection_Buffered(connection.getConnection()) == 0)
wait_all = false;
LOG_DEBUG(
log, "Writing func {} {} {}", wait_payloads.load(), payloads.empty(), natsConnection_Buffered(connection.getConnection()));
if (wait_payloads.load() && payloads.empty() && natsConnection_Buffered(connection.getConnection()) == 0)
wait_all = false;
if (!connection.isConnected() && wait_all)
connection.reconnect();
if (!connection.isConnected() && wait_all)
connection.reconnect();
iterateEventLoop();
iterateEventLoop();
}
}
catch (...)
{
tryLogCurrentException(log);
}
LOG_DEBUG(log, "Producer on subject {} completed", subject);