Documentation; integration tests; subscribe on startup

This commit is contained in:
tchepavel 2022-05-17 16:58:09 +03:00
parent 270c01bcb7
commit a3af94d49e
16 changed files with 1845 additions and 29 deletions

View File

@ -1,24 +1,7 @@
version: '2.3'
services:
nats:
nats1:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 "
networks: ["nats"]
nats-1:
ports:
- "${NATS_EXTERNAL_PORT_USER}:${NATS_INTERNAL_PORT}"
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 --user click --pass house"
networks: ["nats"]
depends_on: ["nats"]
nats-2:
ports:
- "${NATS_EXTERNAL_PORT_TOKEN}:${NATS_INTERNAL_PORT}"
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222 --auth clickhouse"
networks: ["nats"]
depends_on: ["nats"]
networks:
nats:
name: nats
- "${NATS_EXTERNAL_PORT}:${NATS_INTERNAL_PORT}"
command: "-p 4444 --user click --pass house"

View File

@ -0,0 +1,143 @@
---
sidebar_position: 14
sidebar_label: NATS
---
# NATS Engine {#redisstreams-engine}
This engine allows integrating ClickHouse with [NATS](https://nats.io/).
`NATS` lets you:
- Publish or subcribe to message subjects.
- Process new messages as they become available.
## Creating a Table {#table_engine-redisstreams-creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = NATS SETTINGS
nats_url = 'host:port',
nats_subjects = 'subject1,subject2,...',
nats_format = 'data_format'[,]
[nats_row_delimiter = 'delimiter_symbol',]
[nats_schema = '',]
[nats_num_consumers = N,]
[nats_queue_group = 'group_name',]
[nats_secure = false,]
[nats_max_reconnect = N,]
[nats_reconnect_wait = N,]
[nats_server_list = 'host1:port1,host2:port2,...',]
[nats_skip_broken_messages = N,]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password']
[redis_password = 'clickhouse']
```
Required parameters:
- `nats_url` host:port (for example, `localhost:5672`)..
- `nats_subjects` List of subject for NATS table to subscribe/publsh to. Supports wildcard subjects like `foo.*.bar` or `baz.>`
- `nats_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.
Optional parameters:
- `nats_row_delimiter` Delimiter character, which ends the message.
- `nats_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `nats_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `nats_queue_group` Name for queue group of NATS subscribers. Default is the table name.
- `nats_max_reconnect` Maximum amount of reconnection attempts per try to connect to NATS. Default: `5`.
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS.
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS.
- `nats_username` - NATS username.
- `nats_password` - NATS password.
- `nats_token` - NATS auth token.
SSL connection:
For secure connection use `nats_secure = 1`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.
Also format settings can be added along with nats-related settings.
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subject = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
```
The NATS server configuration can be added using the ClickHouse config file.
More specifically you can add Redis password for NATS engine:
``` xml
<nats>
<user>click</user>
<password>house</password>
<token>clickhouse</token>
</nats>
```
## Description {#description}
`SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
1. Use the engine to create a NATS consumer and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from NATS and convert them to the required format using `SELECT`.
One NATS table can have as many materialized views as you like, they do not read data from the table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subject = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
```
To stop receiving streams data or to change the conversion logic, detach the materialized view:
``` sql
DETACH TABLE consumer;
ATTACH TABLE consumer;
```
If you want to change the target table by using `ALTER`, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.
## Virtual Columns {#virtual-columns}
- `_subject` - NATS message subject.
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/nats/) <!--hide-->

View File

@ -17,7 +17,7 @@ class ASTStorage;
M(String, nats_queue_group, "", "Name for queue group of NATS subscribers.", 0) \
M(Bool, nats_secure, false, "Use SSL connection", 0) \
M(UInt64, nats_max_reconnect, 5, "Maximum amount of reconnection attempts.", 0) \
M(UInt64, nats_reconnect_wait, 2000, "Amount to sleep between each reconnect attempt.", 0) \
M(UInt64, nats_reconnect_wait, 2000, "Amount of time in milliseconds to sleep between each reconnect attempt.", 0) \
M(String, nats_server_list, "", "Server list for connection", 0) \
M(UInt64, nats_skip_broken_messages, 0, "Skip at least this number of broken messages from NATS per block", 0) \
M(UInt64, nats_max_block_size, 0, "Number of row collected before flushing data from NATS.", 0) \

View File

@ -70,7 +70,7 @@ StorageNATS::StorageNATS(
{
.url = getContext()->getMacros()->expand(nats_settings->nats_url),
.servers = parseList(getContext()->getMacros()->expand(nats_settings->nats_server_list), ','),
.username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.username", "") : nats_username,
.username = nats_username.empty() ? getContext()->getConfigRef().getString("nats.user", "") : nats_username,
.password = nats_password.empty() ? getContext()->getConfigRef().getString("nats.password", "") : nats_password,
.token = nats_token.empty() ? getContext()->getConfigRef().getString("nats.token", "") : nats_token,
.max_reconnect = static_cast<int>(nats_settings->nats_max_reconnect.value),
@ -92,7 +92,8 @@ StorageNATS::StorageNATS(
{
connection = std::make_shared<NATSConnectionManager>(configuration, log);
if (!connection->connect())
throw Exception(ErrorCodes::CANNOT_CONNECT_NATS, "Cannot connect to {}", connection->connectionInfoForLog());
throw Exception(ErrorCodes::CANNOT_CONNECT_NATS, "Cannot connect to {}. Nats last error: {}",
connection->connectionInfoForLog(), nats_GetLastError(nullptr));
}
catch (...)
{
@ -204,10 +205,41 @@ void StorageNATS::decrementReader()
void StorageNATS::connectionFunc()
{
if (!connection->reconnect())
if (consumers_ready)
return;
bool needs_rescheduling = true;
if (connection->reconnect())
needs_rescheduling &= !initBuffers();
if (needs_rescheduling)
connection_task->scheduleAfter(RESCHEDULE_MS);
}
bool StorageNATS::initBuffers()
{
size_t num_initialized = 0;
for (auto & buffer : buffers)
{
try
{
buffer->subscribe();
++num_initialized;
}
catch (...)
{
tryLogCurrentException(log);
break;
}
}
startLoop();
const bool are_buffers_initialized = num_initialized == num_created_consumers;
if (are_buffers_initialized)
consumers_ready.store(true);
return are_buffers_initialized;
}
/* Need to deactivate this way because otherwise might get a deadlock when first deactivate streaming task in shutdown and then
* inside streaming task try to deactivate any other task
@ -247,6 +279,9 @@ Pipe StorageNATS::read(
size_t /* max_block_size */,
unsigned /* num_streams */)
{
if (!consumers_ready)
throw Exception("NATS consumers setup not finished. Connection might be lost", ErrorCodes::CANNOT_CONNECT_NATS);
if (num_created_consumers == 0)
return {};
@ -332,11 +367,6 @@ SinkToStoragePtr StorageNATS::write(const ASTPtr &, const StorageMetadataPtr & m
void StorageNATS::startup()
{
if (!connection->isConnected())
{
connection_task->activateAndSchedule();
}
for (size_t i = 0; i < num_consumers; ++i)
{
try
@ -353,6 +383,9 @@ void StorageNATS::startup()
}
}
if (!connection->isConnected() || !initBuffers())
connection_task->activateAndSchedule();
streaming_task->activateAndSchedule();
}

View File

@ -92,6 +92,8 @@ private:
uint64_t milliseconds_to_wait;
/// True if consumers have subscribed to all subjects
std::atomic<bool> consumers_ready{false};
/// Needed for tell MV or producer background tasks
/// that they must finish as soon as possible.
std::atomic<bool> shutdown_called{false};
@ -116,11 +118,14 @@ private:
bool isSubjectInSubscriptions(const std::string & subject);
/// Functions working in the background
void streamingToViewsFunc();
void loopingFunc();
void connectionFunc();
bool initBuffers();
void startLoop();
void stopLoop();
void stopLoopIfNoReaders();

View File

@ -113,6 +113,8 @@ void WriteBufferToNATSProducer::publishThreadFunc(void * arg)
{
LOG_DEBUG(buffer->log, "Something went wrong during publishing to NATS subject. Nats status text: {}. Last error message: {}",
natsStatus_GetText(status), nats_GetLastError(nullptr));
if (!buffer->payloads.push(std::move(payload)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
break;
}
}

View File

@ -22,12 +22,14 @@ try:
# Please, add modules that required for specific tests only here.
# So contributors will be able to run most tests locally
# without installing tons of unneeded packages that may be not so easy to install.
import asyncio
from cassandra.policies import RoundRobinPolicy
import cassandra.cluster
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import pymongo
import pymysql
import nats
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
@ -212,6 +214,11 @@ def check_rabbitmq_is_available(rabbitmq_id):
return p.returncode == 0
async def check_nats_is_available(nats_ip):
nc = await nats.connect('{}:4444'.format(nats_ip), user='click', password='house')
return nc.is_connected
def enable_consistent_hash_plugin(rabbitmq_id):
p = subprocess.Popen(
(
@ -335,6 +342,7 @@ class ClickHouseCluster:
self.base_kafka_cmd = []
self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = []
self.base_nats_cmd = []
self.base_cassandra_cmd = []
self.base_jdbc_bridge_cmd = []
self.base_redis_cmd = []
@ -351,6 +359,7 @@ class ClickHouseCluster:
self.with_kafka = False
self.with_kerberized_kafka = False
self.with_rabbitmq = False
self.with_nats = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_kerberized_hdfs = False
@ -431,6 +440,12 @@ class ClickHouseCluster:
self.rabbitmq_dir = p.abspath(p.join(self.instances_dir, "rabbitmq"))
self.rabbitmq_logs_dir = os.path.join(self.rabbitmq_dir, "logs")
self.nats_host = "nats1"
self.nats_ip = None
self.nats_port = 4444
self.nats_docker_id = None
# available when with_nginx == True
self.nginx_host = "nginx"
self.nginx_ip = None
@ -1004,6 +1019,26 @@ class ClickHouseCluster:
]
return self.base_rabbitmq_cmd
def setup_nats_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_nats = True
env_variables["NATS_HOST"] = self.nats_host
env_variables["NATS_INTERNAL_PORT"] = "4444"
env_variables["NATS_EXTERNAL_PORT"] = str(self.nats_port)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_nats.yml")]
)
self.base_nats_cmd = [
"docker-compose",
"--env-file",
instance.env_file,
"--project-name",
self.project_name,
"--file",
p.join(docker_compose_yml_dir, "docker_compose_nats.yml"),
]
return self.base_nats_cmd
def setup_mongo_secure_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mongo = self.with_mongo_secure = True
env_variables["MONGO_HOST"] = self.mongo_host
@ -1170,6 +1205,7 @@ class ClickHouseCluster:
with_kafka=False,
with_kerberized_kafka=False,
with_rabbitmq=False,
with_nats=False,
clickhouse_path_dir=None,
with_odbc_drivers=False,
with_postgres=False,
@ -1258,6 +1294,7 @@ class ClickHouseCluster:
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_nats=with_nats,
with_nginx=with_nginx,
with_kerberized_hdfs=with_kerberized_hdfs,
with_mongo=with_mongo or with_mongo_secure,
@ -1393,6 +1430,11 @@ class ClickHouseCluster:
self.setup_rabbitmq_cmd(instance, env_variables, docker_compose_yml_dir)
)
if with_nats and not self.with_nats:
cmds.append(
self.setup_nats_cmd(instance, env_variables, docker_compose_yml_dir)
)
if with_nginx and not self.with_nginx:
cmds.append(
self.setup_nginx_cmd(instance, env_variables, docker_compose_yml_dir)
@ -1836,6 +1878,18 @@ class ClickHouseCluster:
raise Exception("Cannot wait RabbitMQ container")
return False
def wait_nats_is_available(self, nats_ip, max_retries=5):
retries = 0
while True:
if asyncio.run(check_nats_is_available(nats_ip)):
break
else:
retries += 1
if retries > max_retries:
raise Exception("NATS is not available")
logging.debug("Waiting for NATS to start up")
time.sleep(1)
def wait_nginx_to_start(self, timeout=60):
self.nginx_ip = self.get_instance_ip(self.nginx_host)
start = time.time()
@ -2284,6 +2338,14 @@ class ClickHouseCluster:
if self.wait_rabbitmq_to_start(throw=(i == 4)):
break
if self.with_nats and self.base_nats_cmd:
logging.debug("Setup NATS")
subprocess_check_call(self.base_nats_cmd + common_opts)
self.nats_docker_id = self.get_instance_docker_id("nats1")
self.up_called = True
self.nats_ip = self.get_instance_ip("nats1")
self.wait_nats_is_available(self.nats_ip)
if self.with_hdfs and self.base_hdfs_cmd:
logging.debug("Setup HDFS")
os.makedirs(self.hdfs_logs_dir)
@ -2639,6 +2701,7 @@ class ClickHouseInstance:
with_kafka,
with_kerberized_kafka,
with_rabbitmq,
with_nats,
with_nginx,
with_kerberized_hdfs,
with_mongo,
@ -2719,6 +2782,7 @@ class ClickHouseInstance:
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_nats = with_nats
self.with_nginx = with_nginx
self.with_kerberized_hdfs = with_kerberized_hdfs
self.with_mongo = with_mongo
@ -3689,6 +3753,9 @@ class ClickHouseInstance:
if self.with_rabbitmq:
depends_on.append("rabbitmq1")
if self.with_nats:
depends_on.append("nats1")
if self.with_zookeeper:
depends_on.append("zoo1")
depends_on.append("zoo2")

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message KeyValueProto {
uint64 key = 1;
string value = 2;
}

View File

@ -0,0 +1,7 @@
<clickhouse>
<macros>
<nats_url>nats1:4444</nats_url>
<nats_subjects>macro</nats_subjects>
<nats_format>JSONEachRow</nats_format>
</macros>
</clickhouse>

View File

@ -0,0 +1,13 @@
<clickhouse>
<named_collections>
<nats1>
<nats_url>nats1:4444</nats_url>
<nats_subjects>named</nats_subjects>
<nats_format>JSONEachRow</nats_format>
<nats_skip_broken_messages>111</nats_skip_broken_messages>
<nats_num_consumers>12</nats_num_consumers>
<nats_username>click</nats_username>
<nats_password>house</nats_password>
</nats1>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<nats>
<user>click</user>
<password>house</password>
</nats>
</clickhouse>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/nats.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n)clickhouse_path/format_schemas/nats.proto\"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3')
_KEYVALUEPROTO = DESCRIPTOR.message_types_by_name['KeyValueProto']
KeyValueProto = _reflection.GeneratedProtocolMessageType('KeyValueProto', (_message.Message,), {
'DESCRIPTOR' : _KEYVALUEPROTO,
'__module__' : 'clickhouse_path.format_schemas.nats_pb2'
# @@protoc_insertion_point(class_scope:KeyValueProto)
})
_sym_db.RegisterMessage(KeyValueProto)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_KEYVALUEPROTO._serialized_start=45
_KEYVALUEPROTO._serialized_end=88
# @@protoc_insertion_point(module_scope)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,50 @@
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31
32 32
33 33
34 34
35 35
36 36
37 37
38 38
39 39
40 40
41 41
42 42
43 43
44 44
45 45
46 46
47 47
48 48
49 49