From 87217ce6ccfd861116565c4a07c9a9c613568cda Mon Sep 17 00:00:00 2001 From: tchepavel Date: Mon, 23 May 2022 14:57:39 +0300 Subject: [PATCH] NATS. Add new setting to doc; clean up code. --- .../table-engines/integrations/nats.md | 24 +++++++++++++++++-- .../NATS/WriteBufferToNATSProducer.cpp | 24 ++++++++++++------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/docs/en/engines/table-engines/integrations/nats.md b/docs/en/engines/table-engines/integrations/nats.md index dc99c3515b9..7c975653f0e 100644 --- a/docs/en/engines/table-engines/integrations/nats.md +++ b/docs/en/engines/table-engines/integrations/nats.md @@ -67,6 +67,26 @@ SSL connection: For secure connection use `nats_secure = 1`. The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future. +Writing to NATS table: + +If table reads only from one subject, any insert will publish to the same subject. +However, if table reads from multiple subjects, we need to specify which subject we want to publish to. +That is why whenever inserting into table with multiple subjects, setting `stream_like_engine_insert_queue` is needed. +You can select one of the subjects the table reads from and publish your data there. For example: + +``` sql + CREATE TABLE queue ( + key UInt64, + value UInt64 + ) ENGINE = NATS + SETTINGS nats_url = 'localhost:4444', + nats_subjects = 'subject1,subject2', + nats_format = 'JSONEachRow'; + + INSERT INTO queue + SETTINGS stream_like_engine_insert_queue = 'subject2' + VALUES (1, 1); +``` Also format settings can be added along with nats-related settings. @@ -79,7 +99,7 @@ Example: date DateTime ) ENGINE = NATS SETTINGS nats_url = 'localhost:4444', - nats_subject = 'subject1', + nats_subjects = 'subject1', nats_format = 'JSONEachRow', date_time_input_format = 'best_effort'; ``` @@ -114,7 +134,7 @@ Example: value UInt64 ) ENGINE = NATS SETTINGS nats_url = 'localhost:4444', - nats_subject = 'subject1', + nats_subjects = 'subject1', nats_format = 'JSONEachRow', date_time_input_format = 'best_effort'; diff --git a/src/Storages/NATS/WriteBufferToNATSProducer.cpp b/src/Storages/NATS/WriteBufferToNATSProducer.cpp index f4464db1564..a5ac74018f2 100644 --- a/src/Storages/NATS/WriteBufferToNATSProducer.cpp +++ b/src/Storages/NATS/WriteBufferToNATSProducer.cpp @@ -125,18 +125,26 @@ void WriteBufferToNATSProducer::publishThreadFunc(void * arg) void WriteBufferToNATSProducer::writingFunc() { - while ((!payloads.empty() || wait_all) && !shutdown_called.load()) + try { - publish(); + while ((!payloads.empty() || wait_all) && !shutdown_called.load()) + { + publish(); - LOG_DEBUG(log, "Writing func {} {} {}", wait_payloads.load(), payloads.empty(), natsConnection_Buffered(connection.getConnection())); - if (wait_payloads.load() && payloads.empty() && natsConnection_Buffered(connection.getConnection()) == 0) - wait_all = false; + LOG_DEBUG( + log, "Writing func {} {} {}", wait_payloads.load(), payloads.empty(), natsConnection_Buffered(connection.getConnection())); + if (wait_payloads.load() && payloads.empty() && natsConnection_Buffered(connection.getConnection()) == 0) + wait_all = false; - if (!connection.isConnected() && wait_all) - connection.reconnect(); + if (!connection.isConnected() && wait_all) + connection.reconnect(); - iterateEventLoop(); + iterateEventLoop(); + } + } + catch (...) + { + tryLogCurrentException(log); } LOG_DEBUG(log, "Producer on subject {} completed", subject);