ClickHouse/tests/integration/test_kafka_bad_messages/test.py

286 lines
8.0 KiB
Python
Raw Normal View History

2023-02-24 12:31:09 +00:00
import time
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml"],
with_kafka=True,
)
2023-02-27 13:01:07 +00:00
2023-02-24 12:31:09 +00:00
def kafka_create_topic(
admin_client,
topic_name,
num_partitions=1,
replication_factor=1,
max_retries=50,
config=None,
):
logging.debug(
f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}"
)
topics_list = [
NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs=config,
)
]
retries = 0
while True:
try:
admin_client.create_topics(new_topics=topics_list, validate_only=False)
logging.debug("Admin client succeed")
return
except Exception as e:
retries += 1
time.sleep(0.5)
if retries < max_retries:
logging.warning(f"Failed to create topic {e}")
else:
raise
def kafka_delete_topic(admin_client, topic, max_retries=50):
result = admin_client.delete_topics([topic])
2023-02-27 13:01:07 +00:00
for topic, e in result.topic_error_codes:
2023-02-24 12:31:09 +00:00
if e == 0:
logging.debug(f"Topic {topic} deleted")
else:
logging.error(f"Failed to delete topic {topic}: {e}")
retries = 0
while True:
topics_listed = admin_client.list_topics()
logging.debug(f"TOPICS LISTED: {topics_listed}")
if topic not in topics_listed:
return
else:
retries += 1
time.sleep(0.5)
if retries > max_retries:
raise Exception(f"Failed to delete topics {topic}, {result}")
def get_kafka_producer(port, serializer, retries):
errors = []
for _ in range(retries):
try:
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(port),
value_serializer=serializer,
)
logging.debug("Kafka Connection establised: localhost:{}".format(port))
return producer
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic
)
)
producer = get_kafka_producer(
kafka_cluster.kafka_port, producer_serializer, retries
)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
@pytest.fixture(scope="module")
def kafka_cluster():
try:
cluster.start()
kafka_id = instance.cluster.kafka_docker_id
print(("kafka_id is {}".format(kafka_id)))
yield cluster
finally:
cluster.shutdown()
def test_bad_messages_parsing(kafka_cluster):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
for format_name in [
"TSV",
"TSKV",
"CSV",
"Values",
"JSON",
"JSONEachRow",
"JSONCompactEachRow",
"JSONObjectEachRow",
"Avro",
"RowBinary",
"JSONColumns",
"JSONColumnsWithMetadata",
"Native",
"Arrow",
"ArrowStream",
"Parquet",
"ORC",
"JSONCompactColumns",
"BSONEachRow",
"MySQLDump",
]:
print(format_name)
kafka_create_topic(admin_client, f"{format_name}_err")
instance.query(
f"""
DROP TABLE IF EXISTS view;
DROP TABLE IF EXISTS kafka;
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
"""
)
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
kafka_delete_topic(admin_client, f"{format_name}_err")
protobuf_schema = """
syntax = "proto3";
message Message {
uint64 key = 1;
uint64 value = 2;
};
"""
instance.create_format_schema("schema_test_errors.proto", protobuf_schema)
for format_name in ["Protobuf", "ProtobufSingle", "ProtobufList"]:
instance.query(
f"""
DROP TABLE IF EXISTS view;
DROP TABLE IF EXISTS kafka;
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
"""
)
print(format_name)
kafka_create_topic(admin_client, f"{format_name}_err")
messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
kafka_delete_topic(admin_client, f"{format_name}_err")
capn_proto_schema = """
@0xd9dd7b35452d1c4f;
struct Message
{
key @0 : UInt64;
value @1 : UInt64;
}
"""
instance.create_format_schema("schema_test_errors.capnp", capn_proto_schema)
instance.query(
f"""
DROP TABLE IF EXISTS view;
DROP TABLE IF EXISTS kafka;
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'CapnProto_err',
kafka_group_name = 'CapnProto',
kafka_format = 'CapnProto',
kafka_handle_error_mode='stream',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
"""
)
print("CapnProto")
kafka_create_topic(admin_client, "CapnProto_err")
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, "CapnProto_err", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
assert rows == len(messages)
kafka_delete_topic(admin_client, "CapnProto_err")
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()