From 10c76917ea7e5aec87c534c274b55bef16283620 Mon Sep 17 00:00:00 2001 From: Constantine Peresypkin Date: Sun, 24 Jul 2022 16:13:17 +0200 Subject: [PATCH] fix nats-io TLS support nats-io library needs `NATS_HAS_TLS` define to correctly compile-in TLS support fixes #39525 --- contrib/nats-io-cmake/CMakeLists.txt | 2 + .../runner/compose/docker_compose_nats.yml | 6 +- src/Storages/NATS/NATSConnection.cpp | 8 +- src/Storages/NATS/NATSConnection.h | 3 + tests/integration/helpers/cluster.py | 54 +++++++++-- .../test_storage_nats/nats_certs.sh | 13 +++ tests/integration/test_storage_nats/test.py | 92 ++++++++++--------- 7 files changed, 124 insertions(+), 54 deletions(-) create mode 100755 tests/integration/test_storage_nats/nats_certs.sh diff --git a/contrib/nats-io-cmake/CMakeLists.txt b/contrib/nats-io-cmake/CMakeLists.txt index 5588d5750c4..579bf6f8ae4 100644 --- a/contrib/nats-io-cmake/CMakeLists.txt +++ b/contrib/nats-io-cmake/CMakeLists.txt @@ -18,6 +18,8 @@ elseif(WIN32) set(NATS_PLATFORM_INCLUDE "apple") endif() +add_definitions(-DNATS_HAS_TLS) + file(GLOB PS_SOURCES "${NATS_IO_SOURCE_DIR}/${NATS_PLATFORM_INCLUDE}/*.c") set(SRCS "${NATS_IO_SOURCE_DIR}/asynccb.c" diff --git a/docker/test/integration/runner/compose/docker_compose_nats.yml b/docker/test/integration/runner/compose/docker_compose_nats.yml index 19ae4c162b1..2122f0f639f 100644 --- a/docker/test/integration/runner/compose/docker_compose_nats.yml +++ b/docker/test/integration/runner/compose/docker_compose_nats.yml @@ -4,4 +4,8 @@ services: image: nats ports: - "${NATS_EXTERNAL_PORT}:${NATS_INTERNAL_PORT}" - command: "-p 4444 --user click --pass house" \ No newline at end of file + command: "-p 4444 --user click --pass house --tls --tlscert=/etc/certs/server-cert.pem --tlskey=/etc/certs/server-key.pem" + volumes: + - type: bind + source: "${NATS_CERT_DIR}/nats" + target: /etc/certs diff --git a/src/Storages/NATS/NATSConnection.cpp b/src/Storages/NATS/NATSConnection.cpp index 359754bb144..64beb9f2dff 100644 --- a/src/Storages/NATS/NATSConnection.cpp +++ b/src/Storages/NATS/NATSConnection.cpp @@ -9,7 +9,6 @@ namespace DB { -//static const auto CONNECT_SLEEP = 200; static const auto RETRIES_MAX = 20; static const auto CONNECTED_TO_BUFFER_SIZE = 256; @@ -19,6 +18,10 @@ NATSConnectionManager::NATSConnectionManager(const NATSConfiguration & configura , log(log_) , event_handler(loop.getLoop(), log) { + const char * val = std::getenv("CLICKHOUSE_NATS_TLS_SECURE"); + std::string tls_secure = val == nullptr ? std::string("1") : std::string(val); + if (tls_secure == "0") + skip_verification = true; } @@ -92,6 +95,9 @@ void NATSConnectionManager::connectImpl() if (configuration.secure) { natsOptions_SetSecure(options, true); + } + if (skip_verification) + { natsOptions_SkipServerVerification(options, true); } if (!configuration.url.empty()) diff --git a/src/Storages/NATS/NATSConnection.h b/src/Storages/NATS/NATSConnection.h index 78a273164db..c699f859446 100644 --- a/src/Storages/NATS/NATSConnection.h +++ b/src/Storages/NATS/NATSConnection.h @@ -65,6 +65,9 @@ private: // true if at any point successfully connected to NATS bool has_connection = false; + // use CLICKHOUSE_NATS_TLS_SECURE=0 env var to skip TLS verification of server cert + bool skip_verification = false; + std::mutex mutex; }; diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 7700fc2dffd..43b08883ae5 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -30,6 +30,7 @@ try: import pymongo import pymysql import nats + import ssl import meilisearch from confluent_kafka.avro.cached_schema_registry_client import ( CachedSchemaRegistryClient, @@ -215,9 +216,27 @@ def check_rabbitmq_is_available(rabbitmq_id): return p.returncode == 0 -async def check_nats_is_available(nats_ip): - nc = await nats.connect("{}:4444".format(nats_ip), user="click", password="house") - return nc.is_connected +async def check_nats_is_available(nats_port, ssl_ctx=None): + nc = await nats_connect_ssl( + nats_port, user="click", password="house", ssl_ctx=ssl_ctx + ) + available = nc.is_connected + await nc.close() + return available + + +async def nats_connect_ssl(nats_port, user, password, ssl_ctx=None): + if not ssl_ctx: + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + nc = await nats.connect( + "tls://localhost:{}".format(nats_port), + user=user, + password=password, + tls=ssl_ctx, + ) + return nc def enable_consistent_hash_plugin(rabbitmq_id): @@ -336,6 +355,7 @@ class ClickHouseCluster: self.env_variables = {} self.env_variables["TSAN_OPTIONS"] = "second_deadlock_stack=1" self.env_variables["CLICKHOUSE_WATCHDOG_ENABLE"] = "0" + self.env_variables["CLICKHOUSE_NATS_TLS_SECURE"] = "0" self.up_called = False custom_dockerd_host = custom_dockerd_host or os.environ.get( @@ -464,9 +484,11 @@ class ClickHouseCluster: self.rabbitmq_logs_dir = os.path.join(self.rabbitmq_dir, "logs") self.nats_host = "nats1" - self.nats_ip = None self.nats_port = 4444 self.nats_docker_id = None + self.nats_dir = p.abspath(p.join(self.instances_dir, "nats")) + self.nats_cert_dir = os.path.join(self.nats_dir, "cert") + self.nats_ssl_context = None # available when with_nginx == True self.nginx_host = "nginx" @@ -1046,6 +1068,7 @@ class ClickHouseCluster: env_variables["NATS_HOST"] = self.nats_host env_variables["NATS_INTERNAL_PORT"] = "4444" env_variables["NATS_EXTERNAL_PORT"] = str(self.nats_port) + env_variables["NATS_CERT_DIR"] = self.nats_cert_dir self.base_cmd.extend( ["--file", p.join(docker_compose_yml_dir, "docker_compose_nats.yml")] @@ -1967,10 +1990,12 @@ class ClickHouseCluster: raise Exception("Cannot wait RabbitMQ container") return False - def wait_nats_is_available(self, nats_ip, max_retries=5): + def wait_nats_is_available(self, max_retries=5): retries = 0 while True: - if asyncio.run(check_nats_is_available(nats_ip)): + if asyncio.run( + check_nats_is_available(self.nats_port, ssl_ctx=self.nats_ssl_context) + ): break else: retries += 1 @@ -2453,11 +2478,24 @@ class ClickHouseCluster: if self.with_nats and self.base_nats_cmd: logging.debug("Setup NATS") + os.makedirs(self.nats_cert_dir) + env = os.environ.copy() + env["NATS_CERT_DIR"] = self.nats_cert_dir + run_and_check( + p.join(self.base_dir, "nats_certs.sh"), + env=env, + detach=False, + nothrow=False, + ) + + self.nats_ssl_context = ssl.create_default_context() + self.nats_ssl_context.load_verify_locations( + p.join(self.nats_cert_dir, "ca", "ca-cert.pem") + ) subprocess_check_call(self.base_nats_cmd + common_opts) self.nats_docker_id = self.get_instance_docker_id("nats1") self.up_called = True - self.nats_ip = self.get_instance_ip("nats1") - self.wait_nats_is_available(self.nats_ip) + self.wait_nats_is_available() if self.with_hdfs and self.base_hdfs_cmd: logging.debug("Setup HDFS") diff --git a/tests/integration/test_storage_nats/nats_certs.sh b/tests/integration/test_storage_nats/nats_certs.sh new file mode 100755 index 00000000000..689221c39e4 --- /dev/null +++ b/tests/integration/test_storage_nats/nats_certs.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -euxo pipefail + +mkdir -p "${NATS_CERT_DIR}/ca" +mkdir -p "${NATS_CERT_DIR}/nats" +openssl req -newkey rsa:4096 -x509 -days 365 -nodes -batch -keyout "${NATS_CERT_DIR}/ca/ca-key.pem" -out "${NATS_CERT_DIR}/ca/ca-cert.pem" -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=ca" +openssl req -newkey rsa:4096 -nodes -batch -keyout "${NATS_CERT_DIR}/nats/server-key.pem" -out "${NATS_CERT_DIR}/nats/server-req.pem" -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=server" +openssl x509 -req -days 365 -in "${NATS_CERT_DIR}/nats/server-req.pem" -CA "${NATS_CERT_DIR}/ca/ca-cert.pem" -CAkey "${NATS_CERT_DIR}/ca/ca-key.pem" -CAcreateserial -out "${NATS_CERT_DIR}/nats/server-cert.pem" -extfile <( +cat <<-EOF +subjectAltName = DNS:localhost, DNS:nats1 +EOF +) +rm -f "${NATS_CERT_DIR}/nats/server-req.pem" diff --git a/tests/integration/test_storage_nats/test.py b/tests/integration/test_storage_nats/test.py index a952f4b78a6..63dde8922a6 100644 --- a/tests/integration/test_storage_nats/test.py +++ b/tests/integration/test_storage_nats/test.py @@ -9,11 +9,10 @@ from random import randrange import math import asyncio -import nats import pytest from google.protobuf.internal.encoder import _VarintBytes from helpers.client import QueryRuntimeException -from helpers.cluster import ClickHouseCluster, check_nats_is_available +from helpers.cluster import ClickHouseCluster, check_nats_is_available, nats_connect_ssl from helpers.test_tools import TSV from . import nats_pb2 @@ -35,11 +34,11 @@ instance = cluster.add_instance( # Helpers -def wait_nats_to_start(nats_ip, timeout=180): +def wait_nats_to_start(nats_port, ssl_ctx=None, timeout=180): start = time.time() while time.time() - start < timeout: try: - if asyncio.run(check_nats_is_available(nats_ip)): + if asyncio.run(check_nats_is_available(nats_port, ssl_ctx=ssl_ctx)): logging.debug("NATS is available") return time.sleep(0.5) @@ -63,10 +62,10 @@ def kill_nats(nats_id): return p.returncode == 0 -def revive_nats(nats_id, nats_ip): +def revive_nats(nats_id, nats_port): p = subprocess.Popen(("docker", "start", nats_id), stdout=subprocess.PIPE) p.communicate() - wait_nats_to_start(nats_ip) + wait_nats_to_start(nats_port) # Fixtures @@ -96,8 +95,13 @@ def nats_setup_teardown(): # Tests -async def nats_produce_messages(ip, subject, messages=(), bytes=None): - nc = await nats.connect("{}:4444".format(ip), user="click", password="house") +async def nats_produce_messages(cluster_inst, subject, messages=(), bytes=None): + nc = await nats_connect_ssl( + cluster_inst.nats_port, + user="click", + password="house", + ssl_ctx=cluster_inst.nats_ssl_context, + ) logging.debug("NATS connection status: " + str(nc.is_connected)) for message in messages: @@ -136,7 +140,7 @@ def test_nats_select(nats_cluster): messages = [] for i in range(50): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "select", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "select", messages)) # The order of messages in select * from test.nats is not guaranteed, so sleep to collect everything in one select time.sleep(1) @@ -186,13 +190,13 @@ def test_nats_json_without_delimiter(nats_cluster): messages += json.dumps({"key": i, "value": i}) + "\n" all_messages = [messages] - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "json", all_messages)) + asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages)) messages = "" for i in range(25, 50): messages += json.dumps({"key": i, "value": i}) + "\n" all_messages = [messages] - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "json", all_messages)) + asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages)) time.sleep(1) @@ -229,7 +233,7 @@ def test_nats_csv_with_delimiter(nats_cluster): for i in range(50): messages.append("{i}, {i}".format(i=i)) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "csv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "csv", messages)) time.sleep(1) @@ -268,7 +272,7 @@ def test_nats_tsv_with_delimiter(nats_cluster): for i in range(50): messages.append("{i}\t{i}".format(i=i)) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "tsv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "tsv", messages)) result = "" for _ in range(60): @@ -299,7 +303,7 @@ def test_nats_macros(nats_cluster): message = "" for i in range(50): message += json.dumps({"key": i, "value": i}) + "\n" - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "macro", [message])) + asyncio.run(nats_produce_messages(nats_cluster, "macro", [message])) time.sleep(1) @@ -344,7 +348,7 @@ def test_nats_materialized_view(nats_cluster): for i in range(50): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mv", messages)) time_limit_sec = 60 deadline = time.monotonic() + time_limit_sec @@ -389,7 +393,7 @@ def test_nats_materialized_view_with_subquery(nats_cluster): messages = [] for i in range(50): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mvsq", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mvsq", messages)) time_limit_sec = 60 deadline = time.monotonic() + time_limit_sec @@ -434,7 +438,7 @@ def test_nats_many_materialized_views(nats_cluster): messages = [] for i in range(50): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mmv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mmv", messages)) time_limit_sec = 60 deadline = time.monotonic() + time_limit_sec @@ -485,7 +489,7 @@ def test_nats_protobuf(nats_cluster): msg.value = str(i) serialized_msg = msg.SerializeToString() data = data + _VarintBytes(len(serialized_msg)) + serialized_msg - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data)) + asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data)) data = b"" for i in range(20, 21): msg = nats_pb2.ProtoKeyValue() @@ -493,7 +497,7 @@ def test_nats_protobuf(nats_cluster): msg.value = str(i) serialized_msg = msg.SerializeToString() data = data + _VarintBytes(len(serialized_msg)) + serialized_msg - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data)) + asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data)) data = b"" for i in range(21, 50): msg = nats_pb2.ProtoKeyValue() @@ -501,7 +505,7 @@ def test_nats_protobuf(nats_cluster): msg.value = str(i) serialized_msg = msg.SerializeToString() data = data + _VarintBytes(len(serialized_msg)) + serialized_msg - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data)) + asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data)) result = "" time_limit_sec = 60 @@ -542,7 +546,7 @@ def test_nats_big_message(nats_cluster): logging.debug("Table test.nats is not yet ready") time.sleep(0.5) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "big", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "big", messages)) while True: result = instance.query("SELECT count() FROM test.view") @@ -600,7 +604,7 @@ def test_nats_mv_combo(nats_cluster): for _ in range(messages_num): messages.append(json.dumps({"key": i[0], "value": i[0]})) i[0] += 1 - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "combo", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "combo", messages)) threads = [] threads_num = 20 @@ -662,8 +666,11 @@ def test_nats_insert(nats_cluster): insert_messages = [] async def sub_to_nats(): - nc = await nats.connect( - "{}:4444".format(nats_cluster.nats_ip), user="click", password="house" + nc = await nats_connect_ssl( + nats_cluster.nats_port, + user="click", + password="house", + ssl_ctx=nats_cluster.nats_ssl_context, ) sub = await nc.subscribe("insert") await sub.unsubscribe(50) @@ -771,8 +778,11 @@ def test_nats_many_subjects_insert_right(nats_cluster): insert_messages = [] async def sub_to_nats(): - nc = await nats.connect( - "{}:4444".format(nats_cluster.nats_ip), user="click", password="house" + nc = await nats_connect_ssl( + nats_cluster.nats_port, + user="click", + password="house", + ssl_ctx=nats_cluster.nats_ssl_context, ) sub = await nc.subscribe("right_insert1") await sub.unsubscribe(50) @@ -1003,7 +1013,7 @@ def test_nats_virtual_column(nats_cluster): messages.append(json.dumps({"key": i, "value": i})) i += 1 - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "virtuals", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "virtuals", messages)) while True: result = instance.query("SELECT count() FROM test.view") @@ -1067,7 +1077,7 @@ def test_nats_virtual_column_with_materialized_view(nats_cluster): messages.append(json.dumps({"key": i, "value": i})) i += 1 - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "virtuals_mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "virtuals_mv", messages)) while True: result = instance.query("SELECT count() FROM test.view") @@ -1147,9 +1157,7 @@ def test_nats_many_consumers_to_each_queue(nats_cluster): for _ in range(messages_num): messages.append(json.dumps({"key": i[0], "value": i[0]})) i[0] += 1 - asyncio.run( - nats_produce_messages(nats_cluster.nats_ip, "many_consumers", messages) - ) + asyncio.run(nats_produce_messages(nats_cluster, "many_consumers", messages)) threads = [] threads_num = 20 @@ -1243,7 +1251,7 @@ def test_nats_restore_failed_connection_without_losses_on_write(nats_cluster): kill_nats(nats_cluster.nats_docker_id) time.sleep(4) - revive_nats(nats_cluster.nats_docker_id, nats_cluster.nats_ip) + revive_nats(nats_cluster.nats_docker_id, nats_cluster.nats_port) while True: result = instance.query("SELECT count(DISTINCT key) FROM test.view") @@ -1310,7 +1318,7 @@ def test_nats_no_connection_at_startup_2(nats_cluster): messages = [] for i in range(messages_num): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "cs", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "cs", messages)) for _ in range(20): result = instance.query("SELECT count() FROM test.view") @@ -1353,9 +1361,7 @@ def test_nats_format_factory_settings(nats_cluster): """SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))""" ) - asyncio.run( - nats_produce_messages(nats_cluster.nats_ip, "format_settings", [message]) - ) + asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message])) while True: result = instance.query("SELECT date FROM test.format_settings") @@ -1372,9 +1378,7 @@ def test_nats_format_factory_settings(nats_cluster): """ ) - asyncio.run( - nats_produce_messages(nats_cluster.nats_ip, "format_settings", [message]) - ) + asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message])) while True: result = instance.query("SELECT date FROM test.view") if result == expected: @@ -1424,13 +1428,13 @@ def test_nats_drop_mv(nats_cluster): messages = [] for i in range(20): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mv", messages)) instance.query("DROP VIEW test.consumer") messages = [] for i in range(20, 40): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mv", messages)) instance.query( """ @@ -1441,7 +1445,7 @@ def test_nats_drop_mv(nats_cluster): messages = [] for i in range(40, 50): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mv", messages)) while True: result = instance.query("SELECT * FROM test.view ORDER BY key") @@ -1454,7 +1458,7 @@ def test_nats_drop_mv(nats_cluster): messages = [] for i in range(50, 60): messages.append(json.dumps({"key": i, "value": i})) - asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages)) + asyncio.run(nats_produce_messages(nats_cluster, "mv", messages)) count = 0 while True: @@ -1477,7 +1481,7 @@ def test_nats_predefined_configuration(nats_cluster): asyncio.run( nats_produce_messages( - nats_cluster.nats_ip, "named", [json.dumps({"key": 1, "value": 2})] + nats_cluster, "named", [json.dumps({"key": 1, "value": 2})] ) ) while True: