fix nats-io TLS support

nats-io library needs `NATS_HAS_TLS` define to correctly compile-in TLS
support

fixes #39525
This commit is contained in:
Constantine Peresypkin 2022-07-24 16:13:17 +02:00
parent 31891322a5
commit 10c76917ea
7 changed files with 124 additions and 54 deletions

View File

@ -18,6 +18,8 @@ elseif(WIN32)
set(NATS_PLATFORM_INCLUDE "apple")
endif()
add_definitions(-DNATS_HAS_TLS)
file(GLOB PS_SOURCES "${NATS_IO_SOURCE_DIR}/${NATS_PLATFORM_INCLUDE}/*.c")
set(SRCS
"${NATS_IO_SOURCE_DIR}/asynccb.c"

View File

@ -4,4 +4,8 @@ services:
image: nats
ports:
- "${NATS_EXTERNAL_PORT}:${NATS_INTERNAL_PORT}"
command: "-p 4444 --user click --pass house"
command: "-p 4444 --user click --pass house --tls --tlscert=/etc/certs/server-cert.pem --tlskey=/etc/certs/server-key.pem"
volumes:
- type: bind
source: "${NATS_CERT_DIR}/nats"
target: /etc/certs

View File

@ -9,7 +9,6 @@
namespace DB
{
//static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 20;
static const auto CONNECTED_TO_BUFFER_SIZE = 256;
@ -19,6 +18,10 @@ NATSConnectionManager::NATSConnectionManager(const NATSConfiguration & configura
, log(log_)
, event_handler(loop.getLoop(), log)
{
const char * val = std::getenv("CLICKHOUSE_NATS_TLS_SECURE");
std::string tls_secure = val == nullptr ? std::string("1") : std::string(val);
if (tls_secure == "0")
skip_verification = true;
}
@ -92,6 +95,9 @@ void NATSConnectionManager::connectImpl()
if (configuration.secure)
{
natsOptions_SetSecure(options, true);
}
if (skip_verification)
{
natsOptions_SkipServerVerification(options, true);
}
if (!configuration.url.empty())

View File

@ -65,6 +65,9 @@ private:
// true if at any point successfully connected to NATS
bool has_connection = false;
// use CLICKHOUSE_NATS_TLS_SECURE=0 env var to skip TLS verification of server cert
bool skip_verification = false;
std::mutex mutex;
};

View File

@ -30,6 +30,7 @@ try:
import pymongo
import pymysql
import nats
import ssl
import meilisearch
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
@ -215,9 +216,27 @@ def check_rabbitmq_is_available(rabbitmq_id):
return p.returncode == 0
async def check_nats_is_available(nats_ip):
nc = await nats.connect("{}:4444".format(nats_ip), user="click", password="house")
return nc.is_connected
async def check_nats_is_available(nats_port, ssl_ctx=None):
nc = await nats_connect_ssl(
nats_port, user="click", password="house", ssl_ctx=ssl_ctx
)
available = nc.is_connected
await nc.close()
return available
async def nats_connect_ssl(nats_port, user, password, ssl_ctx=None):
if not ssl_ctx:
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
nc = await nats.connect(
"tls://localhost:{}".format(nats_port),
user=user,
password=password,
tls=ssl_ctx,
)
return nc
def enable_consistent_hash_plugin(rabbitmq_id):
@ -336,6 +355,7 @@ class ClickHouseCluster:
self.env_variables = {}
self.env_variables["TSAN_OPTIONS"] = "second_deadlock_stack=1"
self.env_variables["CLICKHOUSE_WATCHDOG_ENABLE"] = "0"
self.env_variables["CLICKHOUSE_NATS_TLS_SECURE"] = "0"
self.up_called = False
custom_dockerd_host = custom_dockerd_host or os.environ.get(
@ -464,9 +484,11 @@ class ClickHouseCluster:
self.rabbitmq_logs_dir = os.path.join(self.rabbitmq_dir, "logs")
self.nats_host = "nats1"
self.nats_ip = None
self.nats_port = 4444
self.nats_docker_id = None
self.nats_dir = p.abspath(p.join(self.instances_dir, "nats"))
self.nats_cert_dir = os.path.join(self.nats_dir, "cert")
self.nats_ssl_context = None
# available when with_nginx == True
self.nginx_host = "nginx"
@ -1046,6 +1068,7 @@ class ClickHouseCluster:
env_variables["NATS_HOST"] = self.nats_host
env_variables["NATS_INTERNAL_PORT"] = "4444"
env_variables["NATS_EXTERNAL_PORT"] = str(self.nats_port)
env_variables["NATS_CERT_DIR"] = self.nats_cert_dir
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_nats.yml")]
@ -1967,10 +1990,12 @@ class ClickHouseCluster:
raise Exception("Cannot wait RabbitMQ container")
return False
def wait_nats_is_available(self, nats_ip, max_retries=5):
def wait_nats_is_available(self, max_retries=5):
retries = 0
while True:
if asyncio.run(check_nats_is_available(nats_ip)):
if asyncio.run(
check_nats_is_available(self.nats_port, ssl_ctx=self.nats_ssl_context)
):
break
else:
retries += 1
@ -2453,11 +2478,24 @@ class ClickHouseCluster:
if self.with_nats and self.base_nats_cmd:
logging.debug("Setup NATS")
os.makedirs(self.nats_cert_dir)
env = os.environ.copy()
env["NATS_CERT_DIR"] = self.nats_cert_dir
run_and_check(
p.join(self.base_dir, "nats_certs.sh"),
env=env,
detach=False,
nothrow=False,
)
self.nats_ssl_context = ssl.create_default_context()
self.nats_ssl_context.load_verify_locations(
p.join(self.nats_cert_dir, "ca", "ca-cert.pem")
)
subprocess_check_call(self.base_nats_cmd + common_opts)
self.nats_docker_id = self.get_instance_docker_id("nats1")
self.up_called = True
self.nats_ip = self.get_instance_ip("nats1")
self.wait_nats_is_available(self.nats_ip)
self.wait_nats_is_available()
if self.with_hdfs and self.base_hdfs_cmd:
logging.debug("Setup HDFS")

View File

@ -0,0 +1,13 @@
#!/bin/bash
set -euxo pipefail
mkdir -p "${NATS_CERT_DIR}/ca"
mkdir -p "${NATS_CERT_DIR}/nats"
openssl req -newkey rsa:4096 -x509 -days 365 -nodes -batch -keyout "${NATS_CERT_DIR}/ca/ca-key.pem" -out "${NATS_CERT_DIR}/ca/ca-cert.pem" -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=ca"
openssl req -newkey rsa:4096 -nodes -batch -keyout "${NATS_CERT_DIR}/nats/server-key.pem" -out "${NATS_CERT_DIR}/nats/server-req.pem" -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=server"
openssl x509 -req -days 365 -in "${NATS_CERT_DIR}/nats/server-req.pem" -CA "${NATS_CERT_DIR}/ca/ca-cert.pem" -CAkey "${NATS_CERT_DIR}/ca/ca-key.pem" -CAcreateserial -out "${NATS_CERT_DIR}/nats/server-cert.pem" -extfile <(
cat <<-EOF
subjectAltName = DNS:localhost, DNS:nats1
EOF
)
rm -f "${NATS_CERT_DIR}/nats/server-req.pem"

View File

@ -9,11 +9,10 @@ from random import randrange
import math
import asyncio
import nats
import pytest
from google.protobuf.internal.encoder import _VarintBytes
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, check_nats_is_available
from helpers.cluster import ClickHouseCluster, check_nats_is_available, nats_connect_ssl
from helpers.test_tools import TSV
from . import nats_pb2
@ -35,11 +34,11 @@ instance = cluster.add_instance(
# Helpers
def wait_nats_to_start(nats_ip, timeout=180):
def wait_nats_to_start(nats_port, ssl_ctx=None, timeout=180):
start = time.time()
while time.time() - start < timeout:
try:
if asyncio.run(check_nats_is_available(nats_ip)):
if asyncio.run(check_nats_is_available(nats_port, ssl_ctx=ssl_ctx)):
logging.debug("NATS is available")
return
time.sleep(0.5)
@ -63,10 +62,10 @@ def kill_nats(nats_id):
return p.returncode == 0
def revive_nats(nats_id, nats_ip):
def revive_nats(nats_id, nats_port):
p = subprocess.Popen(("docker", "start", nats_id), stdout=subprocess.PIPE)
p.communicate()
wait_nats_to_start(nats_ip)
wait_nats_to_start(nats_port)
# Fixtures
@ -96,8 +95,13 @@ def nats_setup_teardown():
# Tests
async def nats_produce_messages(ip, subject, messages=(), bytes=None):
nc = await nats.connect("{}:4444".format(ip), user="click", password="house")
async def nats_produce_messages(cluster_inst, subject, messages=(), bytes=None):
nc = await nats_connect_ssl(
cluster_inst.nats_port,
user="click",
password="house",
ssl_ctx=cluster_inst.nats_ssl_context,
)
logging.debug("NATS connection status: " + str(nc.is_connected))
for message in messages:
@ -136,7 +140,7 @@ def test_nats_select(nats_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "select", messages))
asyncio.run(nats_produce_messages(nats_cluster, "select", messages))
# The order of messages in select * from test.nats is not guaranteed, so sleep to collect everything in one select
time.sleep(1)
@ -186,13 +190,13 @@ def test_nats_json_without_delimiter(nats_cluster):
messages += json.dumps({"key": i, "value": i}) + "\n"
all_messages = [messages]
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "json", all_messages))
asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages))
messages = ""
for i in range(25, 50):
messages += json.dumps({"key": i, "value": i}) + "\n"
all_messages = [messages]
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "json", all_messages))
asyncio.run(nats_produce_messages(nats_cluster, "json", all_messages))
time.sleep(1)
@ -229,7 +233,7 @@ def test_nats_csv_with_delimiter(nats_cluster):
for i in range(50):
messages.append("{i}, {i}".format(i=i))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "csv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "csv", messages))
time.sleep(1)
@ -268,7 +272,7 @@ def test_nats_tsv_with_delimiter(nats_cluster):
for i in range(50):
messages.append("{i}\t{i}".format(i=i))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "tsv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "tsv", messages))
result = ""
for _ in range(60):
@ -299,7 +303,7 @@ def test_nats_macros(nats_cluster):
message = ""
for i in range(50):
message += json.dumps({"key": i, "value": i}) + "\n"
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "macro", [message]))
asyncio.run(nats_produce_messages(nats_cluster, "macro", [message]))
time.sleep(1)
@ -344,7 +348,7 @@ def test_nats_materialized_view(nats_cluster):
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
time_limit_sec = 60
deadline = time.monotonic() + time_limit_sec
@ -389,7 +393,7 @@ def test_nats_materialized_view_with_subquery(nats_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mvsq", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mvsq", messages))
time_limit_sec = 60
deadline = time.monotonic() + time_limit_sec
@ -434,7 +438,7 @@ def test_nats_many_materialized_views(nats_cluster):
messages = []
for i in range(50):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mmv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mmv", messages))
time_limit_sec = 60
deadline = time.monotonic() + time_limit_sec
@ -485,7 +489,7 @@ def test_nats_protobuf(nats_cluster):
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data))
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
data = b""
for i in range(20, 21):
msg = nats_pb2.ProtoKeyValue()
@ -493,7 +497,7 @@ def test_nats_protobuf(nats_cluster):
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data))
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
data = b""
for i in range(21, 50):
msg = nats_pb2.ProtoKeyValue()
@ -501,7 +505,7 @@ def test_nats_protobuf(nats_cluster):
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "pb", bytes=data))
asyncio.run(nats_produce_messages(nats_cluster, "pb", bytes=data))
result = ""
time_limit_sec = 60
@ -542,7 +546,7 @@ def test_nats_big_message(nats_cluster):
logging.debug("Table test.nats is not yet ready")
time.sleep(0.5)
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "big", messages))
asyncio.run(nats_produce_messages(nats_cluster, "big", messages))
while True:
result = instance.query("SELECT count() FROM test.view")
@ -600,7 +604,7 @@ def test_nats_mv_combo(nats_cluster):
for _ in range(messages_num):
messages.append(json.dumps({"key": i[0], "value": i[0]}))
i[0] += 1
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "combo", messages))
asyncio.run(nats_produce_messages(nats_cluster, "combo", messages))
threads = []
threads_num = 20
@ -662,8 +666,11 @@ def test_nats_insert(nats_cluster):
insert_messages = []
async def sub_to_nats():
nc = await nats.connect(
"{}:4444".format(nats_cluster.nats_ip), user="click", password="house"
nc = await nats_connect_ssl(
nats_cluster.nats_port,
user="click",
password="house",
ssl_ctx=nats_cluster.nats_ssl_context,
)
sub = await nc.subscribe("insert")
await sub.unsubscribe(50)
@ -771,8 +778,11 @@ def test_nats_many_subjects_insert_right(nats_cluster):
insert_messages = []
async def sub_to_nats():
nc = await nats.connect(
"{}:4444".format(nats_cluster.nats_ip), user="click", password="house"
nc = await nats_connect_ssl(
nats_cluster.nats_port,
user="click",
password="house",
ssl_ctx=nats_cluster.nats_ssl_context,
)
sub = await nc.subscribe("right_insert1")
await sub.unsubscribe(50)
@ -1003,7 +1013,7 @@ def test_nats_virtual_column(nats_cluster):
messages.append(json.dumps({"key": i, "value": i}))
i += 1
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "virtuals", messages))
asyncio.run(nats_produce_messages(nats_cluster, "virtuals", messages))
while True:
result = instance.query("SELECT count() FROM test.view")
@ -1067,7 +1077,7 @@ def test_nats_virtual_column_with_materialized_view(nats_cluster):
messages.append(json.dumps({"key": i, "value": i}))
i += 1
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "virtuals_mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "virtuals_mv", messages))
while True:
result = instance.query("SELECT count() FROM test.view")
@ -1147,9 +1157,7 @@ def test_nats_many_consumers_to_each_queue(nats_cluster):
for _ in range(messages_num):
messages.append(json.dumps({"key": i[0], "value": i[0]}))
i[0] += 1
asyncio.run(
nats_produce_messages(nats_cluster.nats_ip, "many_consumers", messages)
)
asyncio.run(nats_produce_messages(nats_cluster, "many_consumers", messages))
threads = []
threads_num = 20
@ -1243,7 +1251,7 @@ def test_nats_restore_failed_connection_without_losses_on_write(nats_cluster):
kill_nats(nats_cluster.nats_docker_id)
time.sleep(4)
revive_nats(nats_cluster.nats_docker_id, nats_cluster.nats_ip)
revive_nats(nats_cluster.nats_docker_id, nats_cluster.nats_port)
while True:
result = instance.query("SELECT count(DISTINCT key) FROM test.view")
@ -1310,7 +1318,7 @@ def test_nats_no_connection_at_startup_2(nats_cluster):
messages = []
for i in range(messages_num):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "cs", messages))
asyncio.run(nats_produce_messages(nats_cluster, "cs", messages))
for _ in range(20):
result = instance.query("SELECT count() FROM test.view")
@ -1353,9 +1361,7 @@ def test_nats_format_factory_settings(nats_cluster):
"""SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))"""
)
asyncio.run(
nats_produce_messages(nats_cluster.nats_ip, "format_settings", [message])
)
asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message]))
while True:
result = instance.query("SELECT date FROM test.format_settings")
@ -1372,9 +1378,7 @@ def test_nats_format_factory_settings(nats_cluster):
"""
)
asyncio.run(
nats_produce_messages(nats_cluster.nats_ip, "format_settings", [message])
)
asyncio.run(nats_produce_messages(nats_cluster, "format_settings", [message]))
while True:
result = instance.query("SELECT date FROM test.view")
if result == expected:
@ -1424,13 +1428,13 @@ def test_nats_drop_mv(nats_cluster):
messages = []
for i in range(20):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
instance.query("DROP VIEW test.consumer")
messages = []
for i in range(20, 40):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
instance.query(
"""
@ -1441,7 +1445,7 @@ def test_nats_drop_mv(nats_cluster):
messages = []
for i in range(40, 50):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
while True:
result = instance.query("SELECT * FROM test.view ORDER BY key")
@ -1454,7 +1458,7 @@ def test_nats_drop_mv(nats_cluster):
messages = []
for i in range(50, 60):
messages.append(json.dumps({"key": i, "value": i}))
asyncio.run(nats_produce_messages(nats_cluster.nats_ip, "mv", messages))
asyncio.run(nats_produce_messages(nats_cluster, "mv", messages))
count = 0
while True:
@ -1477,7 +1481,7 @@ def test_nats_predefined_configuration(nats_cluster):
asyncio.run(
nats_produce_messages(
nats_cluster.nats_ip, "named", [json.dumps({"key": 1, "value": 2})]
nats_cluster, "named", [json.dumps({"key": 1, "value": 2})]
)
)
while True: