mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
247 lines
7.4 KiB
Python
247 lines
7.4 KiB
Python
import io
|
|
import logging
|
|
import time
|
|
from urllib import parse
|
|
|
|
import avro.schema
|
|
import pytest
|
|
from confluent_kafka.avro.cached_schema_registry_client import (
|
|
CachedSchemaRegistryClient,
|
|
)
|
|
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
|
|
|
|
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
|
|
|
|
# Skip on ARM due to Confluent/Kafka
|
|
if is_arm():
|
|
pytestmark = pytest.mark.skip
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance("dummy", with_kafka=True, with_secrets=True)
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def run_query(instance, query, data=None, settings=None):
|
|
# type: (ClickHouseInstance, str, object, dict) -> str
|
|
|
|
logging.info("Running query '{}'...".format(query))
|
|
# use http to force parsing on server
|
|
if not data:
|
|
data = " " # make POST request
|
|
result = instance.http_query(query, data=data, params=settings)
|
|
logging.info("Query finished")
|
|
|
|
return result
|
|
|
|
|
|
def test_select(started_cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_port)
|
|
arg = {"url": reg_url}
|
|
|
|
schema_registry_client = CachedSchemaRegistryClient(arg)
|
|
serializer = MessageSerializer(schema_registry_client)
|
|
|
|
schema = avro.schema.make_avsc_object(
|
|
{
|
|
"name": "test_record1",
|
|
"type": "record",
|
|
"fields": [{"name": "value", "type": "long"}],
|
|
}
|
|
)
|
|
|
|
buf = io.BytesIO()
|
|
for x in range(0, 3):
|
|
message = serializer.encode_record_with_schema(
|
|
"test_subject1", schema, {"value": x}
|
|
)
|
|
buf.write(message)
|
|
data = buf.getvalue()
|
|
|
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
|
schema_registry_url = "http://{}:{}".format(
|
|
started_cluster.schema_registry_host, started_cluster.schema_registry_port
|
|
)
|
|
|
|
run_query(instance, "create table avro_data(value Int64) engine = Memory()")
|
|
settings = {"format_avro_schema_registry_url": schema_registry_url}
|
|
run_query(instance, "insert into avro_data format AvroConfluent", data, settings)
|
|
stdout = run_query(instance, "select * from avro_data")
|
|
assert list(map(str.split, stdout.splitlines())) == [
|
|
["0"],
|
|
["1"],
|
|
["2"],
|
|
]
|
|
|
|
|
|
def test_select_auth(started_cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_auth_port)
|
|
arg = {
|
|
"url": reg_url,
|
|
"basic.auth.credentials.source": "USER_INFO",
|
|
"basic.auth.user.info": "schemauser:letmein",
|
|
}
|
|
|
|
schema_registry_client = CachedSchemaRegistryClient(arg)
|
|
serializer = MessageSerializer(schema_registry_client)
|
|
|
|
schema = avro.schema.make_avsc_object(
|
|
{
|
|
"name": "test_record_auth",
|
|
"type": "record",
|
|
"fields": [{"name": "value", "type": "long"}],
|
|
}
|
|
)
|
|
|
|
buf = io.BytesIO()
|
|
for x in range(0, 3):
|
|
message = serializer.encode_record_with_schema(
|
|
"test_subject_auth", schema, {"value": x}
|
|
)
|
|
buf.write(message)
|
|
data = buf.getvalue()
|
|
|
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
|
schema_registry_url = "http://{}:{}@{}:{}".format(
|
|
"schemauser",
|
|
"letmein",
|
|
started_cluster.schema_registry_auth_host,
|
|
started_cluster.schema_registry_auth_port,
|
|
)
|
|
|
|
run_query(instance, "create table avro_data_auth(value Int64) engine = Memory()")
|
|
settings = {"format_avro_schema_registry_url": schema_registry_url}
|
|
run_query(
|
|
instance, "insert into avro_data_auth format AvroConfluent", data, settings
|
|
)
|
|
stdout = run_query(instance, "select * from avro_data_auth")
|
|
assert list(map(str.split, stdout.splitlines())) == [
|
|
["0"],
|
|
["1"],
|
|
["2"],
|
|
]
|
|
|
|
|
|
def test_select_auth_encoded(started_cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_auth_port)
|
|
arg = {
|
|
"url": reg_url,
|
|
"basic.auth.credentials.source": "USER_INFO",
|
|
"basic.auth.user.info": "schemauser:letmein",
|
|
}
|
|
|
|
schema_registry_client = CachedSchemaRegistryClient(arg)
|
|
serializer = MessageSerializer(schema_registry_client)
|
|
|
|
schema = avro.schema.make_avsc_object(
|
|
{
|
|
"name": "test_record_auth_encoded",
|
|
"type": "record",
|
|
"fields": [{"name": "value", "type": "long"}],
|
|
}
|
|
)
|
|
|
|
buf = io.BytesIO()
|
|
for x in range(0, 3):
|
|
message = serializer.encode_record_with_schema(
|
|
"test_subject_auth_encoded", schema, {"value": x}
|
|
)
|
|
buf.write(message)
|
|
data = buf.getvalue()
|
|
|
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
|
schema_registry_url = "http://{}:{}@{}:{}".format(
|
|
parse.quote_plus("schemauser/slash"),
|
|
parse.quote_plus("letmein"),
|
|
started_cluster.schema_registry_auth_host,
|
|
started_cluster.schema_registry_auth_port,
|
|
)
|
|
|
|
run_query(
|
|
instance, "create table avro_data_auth_encoded(value Int64) engine = Memory()"
|
|
)
|
|
settings = {"format_avro_schema_registry_url": schema_registry_url}
|
|
run_query(
|
|
instance,
|
|
"insert into avro_data_auth_encoded format AvroConfluent",
|
|
data,
|
|
settings,
|
|
)
|
|
stdout = run_query(instance, "select * from avro_data_auth_encoded")
|
|
assert list(map(str.split, stdout.splitlines())) == [
|
|
["0"],
|
|
["1"],
|
|
["2"],
|
|
]
|
|
|
|
|
|
def test_select_auth_encoded_complex(started_cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
reg_url = "http://localhost:{}".format(started_cluster.schema_registry_auth_port)
|
|
arg = {
|
|
"url": reg_url,
|
|
"basic.auth.credentials.source": "USER_INFO",
|
|
"basic.auth.user.info": "schemauser:letmein",
|
|
}
|
|
|
|
schema_registry_client = CachedSchemaRegistryClient(arg)
|
|
serializer = MessageSerializer(schema_registry_client)
|
|
|
|
schema = avro.schema.make_avsc_object(
|
|
{
|
|
"name": "test_record_auth_encoded_complex",
|
|
"type": "record",
|
|
"fields": [{"name": "value", "type": "long"}],
|
|
}
|
|
)
|
|
|
|
buf = io.BytesIO()
|
|
for x in range(0, 3):
|
|
message = serializer.encode_record_with_schema(
|
|
"test_subject_auth_encoded_complex", schema, {"value": x}
|
|
)
|
|
buf.write(message)
|
|
data = buf.getvalue()
|
|
|
|
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
|
schema_registry_url = "http://{}:{}@{}:{}".format(
|
|
parse.quote_plus("complexschemauser"),
|
|
parse.quote_plus("letmein%@:/"),
|
|
started_cluster.schema_registry_auth_host,
|
|
started_cluster.schema_registry_auth_port,
|
|
)
|
|
|
|
run_query(
|
|
instance,
|
|
"create table avro_data_auth_encoded_complex(value Int64) engine = Memory()",
|
|
)
|
|
settings = {"format_avro_schema_registry_url": schema_registry_url}
|
|
run_query(
|
|
instance,
|
|
"insert into avro_data_auth_encoded_complex format AvroConfluent",
|
|
data,
|
|
settings,
|
|
)
|
|
stdout = run_query(instance, "select * from avro_data_auth_encoded_complex")
|
|
assert list(map(str.split, stdout.splitlines())) == [
|
|
["0"],
|
|
["1"],
|
|
["2"],
|
|
]
|