ClickHouse/tests/integration/test_format_avro_confluent/test.py

78 lines
2.3 KiB
Python
Raw Normal View History

2020-02-03 00:02:19 +00:00
import io
import logging
2020-02-03 00:02:19 +00:00
import avro.schema
import pytest
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
2020-10-02 16:54:07 +00:00
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
2020-02-03 00:02:19 +00:00
2020-02-03 00:02:19 +00:00
@pytest.fixture(scope="module")
2021-02-24 11:46:58 +00:00
def started_cluster():
2020-02-03 00:02:19 +00:00
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("dummy", with_kafka=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, data=None, settings=None):
2020-02-03 00:02:19 +00:00
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
# use http to force parsing on server
if not data:
data = " " # make POST request
result = instance.http_query(query, data=data, params=settings)
2020-02-03 00:02:19 +00:00
logging.info("Query finished")
return result
2021-02-24 11:46:58 +00:00
def test_select(started_cluster):
2020-02-03 00:02:19 +00:00
# type: (ClickHouseCluster) -> None
schema_registry_client = CachedSchemaRegistryClient(
"http://localhost:{}".format(started_cluster.schema_registry_port)
)
2020-02-03 00:02:19 +00:00
serializer = MessageSerializer(schema_registry_client)
schema = avro.schema.make_avsc_object(
{
"name": "test_record",
"type": "record",
"fields": [{"name": "value", "type": "long"}],
}
)
2020-02-03 00:02:19 +00:00
buf = io.BytesIO()
for x in range(0, 3):
message = serializer.encode_record_with_schema(
"test_subject", schema, {"value": x}
2020-02-03 00:02:19 +00:00
)
buf.write(message)
data = buf.getvalue()
2020-02-03 00:02:19 +00:00
2021-02-24 11:46:58 +00:00
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
2020-02-03 00:02:19 +00:00
schema_registry_url = "http://{}:{}".format(
started_cluster.schema_registry_host, 8081
2020-02-03 00:02:19 +00:00
)
2020-02-03 00:02:19 +00:00
run_query(instance, "create table avro_data(value Int64) engine = Memory()")
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(instance, "insert into avro_data format AvroConfluent", data, settings)
2020-02-03 00:02:19 +00:00
stdout = run_query(instance, "select * from avro_data")
assert list(map(str.split, stdout.splitlines())) == [
["0"],
["1"],
["2"],
]