test_for_basic_auth_registry - both tests works, simplifications

This commit is contained in:
Ilya Golshtein 2023-05-08 07:08:04 +00:00
parent db8e96147a
commit 66581d091a
4 changed files with 116 additions and 123 deletions

View File

@ -1,43 +1,33 @@
version: '2.3'
services:
# kafka_zookeeper:
# image: zookeeper:3.4.9
# hostname: kafka_zookeeper
# environment:
# ZOO_MY_ID: 1
# ZOO_PORT: 2181
# ZOO_SERVERS: server.1=kafka_zookeeper:2888:3888
# security_opt:
# - label:disable
kafka_zookeeper:
image: confluentinc/cp-zookeeper
image: zookeeper:3.4.9
hostname: kafka_zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# security_opt:
# - label:disable
ZOOKEEPER_CLIENT_PORT: 2181
security_opt:
- label:disable
kafka1:
image: confluentinc/cp-kafka
image: confluentinc/cp-kafka:5.2.0
hostname: kafka1
ports:
- ${KAFKA_EXTERNAL_PORT}:${KAFKA_EXTERNAL_PORT}
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092, PLAINTEXT_HOST://localhost:${KAFKA_EXTERNAL_PORT}
KAFKA_ADVERTISED_HOST_NAME: kafka1
# KAFKA_LISTENERS: INSIDE://0.0.0.0:${KAFKA_EXTERNAL_PORT},OUTSIDE://0.0.0.0:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: kafka_zookeeper:2181
# KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- kafka_zookeeper
# security_opt:
# - label:disable
security_opt:
- label:disable
schema-registry:
image: confluentinc/cp-schema-registry:5.2.0
@ -46,15 +36,14 @@ services:
- ${SCHEMA_REGISTRY_EXTERNAL_PORT}:${SCHEMA_REGISTRY_EXTERNAL_PORT}
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: kafka_zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:${SCHEMA_REGISTRY_EXTERNAL_PORT:-12313}
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:${SCHEMA_REGISTRY_EXTERNAL_PORT}
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: noauth
depends_on:
- kafka_zookeeper
- kafka1
# security_opt:
# - label:disable
security_opt:
- label:disable
schema-registry-auth:
image: confluentinc/cp-schema-registry:5.2.0
@ -62,19 +51,14 @@ services:
ports:
- ${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}:${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}
environment:
# SCHEMA_REGISTRY_EXTERNAL_PORT: ${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}
SCHEMA_REGISTRY_HOST_NAME: schema-registry-auth
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:${SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT}
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: kafka_zookeeper:2181
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: user
SCHEMA_REGISTRY_AUTHENTICATION_REALM: RealmFooBar
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/etc/schema-registry/secrets/schema_registry_jaas.conf"
# SCHEMA_REGISTRY_GROUP_ID: auth
SCHEMA_REGISTRY_ZK_NAMESPACE: schema_registry_auth
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas2
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: auth
volumes:
- ${SCHEMA_REGISTRY_DIR:-}/secrets:/etc/schema-registry/secrets
depends_on:

View File

@ -481,18 +481,14 @@ class ClickHouseCluster:
# available when with_kafka == True
self.kafka_host = "kafka1"
self.kafka2_host = "kafka2"
self.kafka_dir = os.path.join(self.instances_dir, "kafka")
self._kafka_port = 0
self._kafka2_port = 0
self.kafka_docker_id = None
self.kafka2_docker_id = None
self.schema_registry_host = "schema-registry"
self._schema_registry_port = 0
self.schema_registry_auth_host = "schema-registry-auth"
self._schema_registry_auth_port = 0
self.kafka_docker_id = self.get_instance_docker_id(self.kafka_host)
self.kafka2_docker_id = self.get_instance_docker_id(self.kafka2_host)
self.coredns_host = "coredns"
@ -656,13 +652,6 @@ class ClickHouseCluster:
self._kafka_port = get_free_port()
return self._kafka_port
@property
def kafka2_port(self):
if self._kafka2_port:
return self._kafka2_port
self._kafka2_port = get_free_port()
return self._kafka2_port
@property
def schema_registry_port(self):
if self._schema_registry_port:
@ -1184,12 +1173,9 @@ class ClickHouseCluster:
):
self.with_kafka = True
env_variables["KAFKA_HOST"] = self.kafka_host
env_variables["KAFKA2_HOST"] = self.kafka2_host
env_variables["KAFKA_EXTERNAL_PORT"] = str(self.kafka_port)
env_variables["KAFKA2_EXTERNAL_PORT"] = str(self.kafka2_port)
env_variables["SCHEMA_REGISTRY_DIR"] = instance.path + "/"
env_variables["SCHEMA_REGISTRY_EXTERNAL_PORT"] = str(self.schema_registry_port)
env_variables["SCHEMA_REGISTRY_INTERNAL_PORT"] = "8081"
env_variables["SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT"] = str(self.schema_registry_auth_port)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_kafka.yml")]
@ -2521,44 +2507,27 @@ class ClickHouseCluster:
raise Exception("Can't wait Azurite to start")
def wait_schema_registry_to_start(self, timeout=180):
reg_url="http://localhost:{}".format(self.schema_registry_port)
arg={'url':reg_url}
sr_client = CachedSchemaRegistryClient(arg)
for port in self.schema_registry_port, self.schema_registry_auth_port:
reg_url="http://localhost:{}".format(port)
arg={'url':reg_url}
sr_client = CachedSchemaRegistryClient(arg)
start = time.time()
sr_started = False
sr_auth_started = False
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
logging.debug("Connected to SchemaRegistry")
sr_started = True
break
except Exception as ex:
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
time.sleep(1)
start = time.time()
sr_started = False
sr_auth_started = False
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
logging.debug("Connected to SchemaRegistry")
# don't care about possible auth errors
sr_started = True
break
except Exception as ex:
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
time.sleep(1)
if not sr_started:
raise Exception("Can't wait Schema Registry to start")
auth_reg_url="http://localhost:{}".format(self.schema_registry_auth_port)
auth_arg={'url':auth_reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
sr_auth_client = CachedSchemaRegistryClient(auth_arg)
while time.time() - start < timeout:
try:
sr_auth_client._send_request(sr_auth_client.url)
logging.debug("Connected to SchemaRegistry with auth")
sr_auth_started = True
break
except Exception as ex:
logging.debug(("Can't connect to SchemaRegistry with auth: %s", str(ex)))
time.sleep(1)
if not sr_auth_started:
raise Exception("Can't wait Schema Registry with auth to start")
if not sr_started:
raise Exception("Can't wait Schema Registry to start")
def wait_cassandra_to_start(self, timeout=180):
self.cassandra_ip = self.get_instance_ip(self.cassandra_host)
@ -2765,7 +2734,6 @@ class ClickHouseCluster:
)
self.up_called = True
self.wait_kafka_is_available(self.kafka_docker_id, self.kafka_port)
# self.wait_kafka_is_available(self.kafka2_docker_id, self.kafka2_port)
self.wait_schema_registry_to_start()
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:

View File

@ -1 +1,2 @@
schemauser: MD5:0d107d09f5bbe40cade3de5c71e9e9b7,user
schemauser/slash: MD5:0d107d09f5bbe40cade3de5c71e9e9b7,user

View File

@ -9,7 +9,7 @@ from confluent_kafka.avro.cached_schema_registry_client import (
)
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from urllib import parse
@pytest.fixture(scope="module")
def started_cluster():
@ -47,11 +47,8 @@ def run_query(instance, query, data=None, settings=None):
def test_select(started_cluster):
# type: (ClickHouseCluster) -> None
time.sleep(3)
# input("Top of test_select, press any key")
# schema_registry_client = CachedSchemaRegistryClient(
# "http://localhost:{}".format(started_cluster.schema_registry_port)
# )
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_port)
arg={'url':reg_url}
@ -91,45 +88,88 @@ def test_select(started_cluster):
]
# def test_select_auth(started_cluster):
# # type: (ClickHouseCluster) -> None
# time.sleep(5)
def test_select_auth(started_cluster):
# type: (ClickHouseCluster) -> None
time.sleep(5)
# reg_url="http://localhost:{}".format(
# started_cluster.schema_registry_auth_port)
# arg={'url':reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_auth_port)
arg={'url':reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
# schema_registry_client = CachedSchemaRegistryClient(arg)
# serializer = MessageSerializer(schema_registry_client)
schema_registry_client = CachedSchemaRegistryClient(arg)
serializer = MessageSerializer(schema_registry_client)
# schema = avro.schema.make_avsc_object(
# {
# "name": "test_record_auth",
# "type": "record",
# "fields": [{"name": "value", "type": "long"}],
# }
# )
schema = avro.schema.make_avsc_object(
{
"name": "test_record_auth",
"type": "record",
"fields": [{"name": "value", "type": "long"}],
}
)
# buf = io.BytesIO()
# for x in range(0, 3):
# message = serializer.encode_record_with_schema(
# "test_subject_auth", schema, {"value": x}
# )
# buf.write(message)
# data = buf.getvalue()
buf = io.BytesIO()
for x in range(0, 3):
message = serializer.encode_record_with_schema(
"test_subject_auth", schema, {"value": x}
)
buf.write(message)
data = buf.getvalue()
# instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
# schema_registry_url = "http://{}:{}@{}:{}".format(
# 'schemauser', 'letmein',
# started_cluster.schema_registry_auth_host, started_cluster.schema_registry_auth_port
# )
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}@{}:{}".format(
'schemauser', 'letmein',
started_cluster.schema_registry_auth_host, started_cluster.schema_registry_auth_port
)
# run_query(instance, "create table avro_data_auth(value Int64) engine = Memory()")
# settings = {"format_avro_schema_registry_url": schema_registry_url}
# run_query(instance, "insert into avro_data_auth format AvroConfluent", data, settings)
# stdout = run_query(instance, "select * from avro_data_auth")
# assert list(map(str.split, stdout.splitlines())) == [
# ["0"],
# ["1"],
# ["2"],
# ]
run_query(instance, "create table avro_data_auth(value Int64) engine = Memory()")
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(instance, "insert into avro_data_auth format AvroConfluent", data, settings)
stdout = run_query(instance, "select * from avro_data_auth")
assert list(map(str.split, stdout.splitlines())) == [
["0"],
["1"],
["2"],
]
def test_select_auth_encoded(started_cluster):
# type: (ClickHouseCluster) -> None
time.sleep(5)
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_auth_port)
arg={'url':reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
schema_registry_client = CachedSchemaRegistryClient(arg)
serializer = MessageSerializer(schema_registry_client)
schema = avro.schema.make_avsc_object(
{
"name": "test_record_auth_encoded",
"type": "record",
"fields": [{"name": "value", "type": "long"}],
}
)
buf = io.BytesIO()
for x in range(0, 3):
message = serializer.encode_record_with_schema(
"test_subject_auth_encoded", schema, {"value": x}
)
buf.write(message)
data = buf.getvalue()
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}@{}:{}".format(
parse.quote_plus('schemauser/slash'), parse.quote_plus('letmein'),
started_cluster.schema_registry_auth_host, started_cluster.schema_registry_auth_port
)
run_query(instance, "create table avro_data_auth_encoded(value Int64) engine = Memory()")
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(instance, "insert into avro_data_auth_encoded format AvroConfluent", data, settings)
stdout = run_query(instance, "select * from avro_data_auth_encoded")
assert list(map(str.split, stdout.splitlines())) == [
["0"],
["1"],
["2"],
]