test_for_basic_auth_registry - black formatter happy + some doc

This commit is contained in:
Ilya Golshtein 2023-05-08 20:49:43 +00:00
parent c1c5ffa309
commit c550a532e6
3 changed files with 59 additions and 27 deletions

View File

@ -1325,6 +1325,17 @@ Default value: 0.
Sets [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html) URL to use with [AvroConfluent](../../interfaces/formats.md/#data-format-avro-confluent) format.
Format:
``` text
http://[user:password@]machine[:port]"
```
Examples:
``` text
http://registry.example.com:8081
http://admin:secret@registry.example.com:8081
```
Default value: `Empty`.
### output_format_avro_codec {#output_format_avro_codec}

View File

@ -1168,15 +1168,15 @@ class ClickHouseCluster:
]
return self.base_kerberized_hdfs_cmd
def setup_kafka_cmd(
self, instance, env_variables, docker_compose_yml_dir
):
def setup_kafka_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_kafka = True
env_variables["KAFKA_HOST"] = self.kafka_host
env_variables["KAFKA_EXTERNAL_PORT"] = str(self.kafka_port)
env_variables["SCHEMA_REGISTRY_DIR"] = instance.path + "/"
env_variables["SCHEMA_REGISTRY_EXTERNAL_PORT"] = str(self.schema_registry_port)
env_variables["SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT"] = str(self.schema_registry_auth_port)
env_variables["SCHEMA_REGISTRY_AUTH_EXTERNAL_PORT"] = str(
self.schema_registry_auth_port
)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_kafka.yml")]
)
@ -1617,7 +1617,10 @@ class ClickHouseCluster:
with_nats=with_nats,
with_nginx=with_nginx,
with_kerberized_hdfs=with_kerberized_hdfs,
with_secrets=with_secrets or with_kerberized_hdfs or with_kerberos_kdc or with_kerberized_kafka,
with_secrets=with_secrets
or with_kerberized_hdfs
or with_kerberos_kdc
or with_kerberized_kafka,
with_mongo=with_mongo or with_mongo_secure,
with_meili=with_meili,
with_redis=with_redis,
@ -2508,8 +2511,8 @@ class ClickHouseCluster:
def wait_schema_registry_to_start(self, timeout=180):
for port in self.schema_registry_port, self.schema_registry_auth_port:
reg_url="http://localhost:{}".format(port)
arg={'url':reg_url}
reg_url = "http://localhost:{}".format(port)
arg = {"url": reg_url}
sr_client = CachedSchemaRegistryClient(arg)
start = time.time()
@ -4245,8 +4248,8 @@ class ClickHouseInstance:
base_secrets_dir = self.cluster.instances_dir
else:
base_secrets_dir = self.path
from_dir=self.secrets_dir
to_dir=p.abspath(p.join(base_secrets_dir, "secrets"))
from_dir = self.secrets_dir
to_dir = p.abspath(p.join(base_secrets_dir, "secrets"))
logging.debug(f"Copy secret from {from_dir} to {to_dir}")
shutil.copytree(
self.secrets_dir,

View File

@ -11,6 +11,7 @@ from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from urllib import parse
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -37,8 +38,6 @@ def run_query(instance, query, data=None, settings=None):
return result
# reg_url="http://localhost:{}".format(started_cluster.schema_registry_port)
# arg={'url':reg_url}
# schema_registry_client = CachedSchemaRegistryClient(arg)
@ -49,9 +48,8 @@ def test_select(started_cluster):
# input("Top of test_select, press any key")
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_port)
arg={'url':reg_url}
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_port)
arg = {"url": reg_url}
schema_registry_client = CachedSchemaRegistryClient(arg)
serializer = MessageSerializer(schema_registry_client)
@ -92,9 +90,12 @@ def test_select_auth(started_cluster):
# type: (ClickHouseCluster) -> None
time.sleep(5)
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_auth_port)
arg={'url':reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_auth_port)
arg = {
"url": reg_url,
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "schemauser:letmein",
}
schema_registry_client = CachedSchemaRegistryClient(arg)
serializer = MessageSerializer(schema_registry_client)
@ -117,13 +118,17 @@ def test_select_auth(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}@{}:{}".format(
'schemauser', 'letmein',
started_cluster.schema_registry_auth_host, started_cluster.schema_registry_auth_port
"schemauser",
"letmein",
started_cluster.schema_registry_auth_host,
started_cluster.schema_registry_auth_port,
)
run_query(instance, "create table avro_data_auth(value Int64) engine = Memory()")
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(instance, "insert into avro_data_auth format AvroConfluent", data, settings)
run_query(
instance, "insert into avro_data_auth format AvroConfluent", data, settings
)
stdout = run_query(instance, "select * from avro_data_auth")
assert list(map(str.split, stdout.splitlines())) == [
["0"],
@ -131,13 +136,17 @@ def test_select_auth(started_cluster):
["2"],
]
def test_select_auth_encoded(started_cluster):
# type: (ClickHouseCluster) -> None
time.sleep(5)
reg_url="http://localhost:{}".format(
started_cluster.schema_registry_auth_port)
arg={'url':reg_url,'basic.auth.credentials.source':'USER_INFO','basic.auth.user.info':'schemauser:letmein'}
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_auth_port)
arg = {
"url": reg_url,
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "schemauser:letmein",
}
schema_registry_client = CachedSchemaRegistryClient(arg)
serializer = MessageSerializer(schema_registry_client)
@ -160,13 +169,22 @@ def test_select_auth_encoded(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}@{}:{}".format(
parse.quote_plus('schemauser/slash'), parse.quote_plus('letmein'),
started_cluster.schema_registry_auth_host, started_cluster.schema_registry_auth_port
parse.quote_plus("schemauser/slash"),
parse.quote_plus("letmein"),
started_cluster.schema_registry_auth_host,
started_cluster.schema_registry_auth_port,
)
run_query(instance, "create table avro_data_auth_encoded(value Int64) engine = Memory()")
run_query(
instance, "create table avro_data_auth_encoded(value Int64) engine = Memory()"
)
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(instance, "insert into avro_data_auth_encoded format AvroConfluent", data, settings)
run_query(
instance,
"insert into avro_data_auth_encoded format AvroConfluent",
data,
settings,
)
stdout = run_query(instance, "select * from avro_data_auth_encoded")
assert list(map(str.split, stdout.splitlines())) == [
["0"],