kafka_dead_letter_queue: black formatter

This commit is contained in:
Ilya Golshtein 2024-08-28 09:49:02 +00:00
parent e411d7f50f
commit d598278b89

View File

@ -122,6 +122,7 @@ def kafka_cluster():
finally:
cluster.shutdown()
def view_test(expected_num_messages, *_):
attempt = 0
rows = 0
@ -134,18 +135,27 @@ def view_test(expected_num_messages, *_):
assert rows == expected_num_messages
def dead_letter_queue_test(expected_num_messages, topic_name):
view_test(expected_num_messages)
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical")
result = instance.query(
f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical"
)
logging.debug(f"system.dead_letter_queue contains {result}")
rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'"))
rows = int(
instance.query(
f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'"
)
)
assert rows == expected_num_messages
def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method):
def bad_messages_parsing_mode(
kafka_cluster, handle_error_mode, additional_dml, check_method
):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
@ -284,17 +294,24 @@ struct Message
kafka_delete_topic(admin_client, f"{topic_name}")
def test_bad_messages_parsing_stream(kafka_cluster):
bad_messages_parsing_mode(kafka_cluster,
'stream',
'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0',
view_test)
bad_messages_parsing_mode(
kafka_cluster,
"stream",
"CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0",
view_test,
)
def test_bad_messages_parsing_dead_letter_queue(kafka_cluster):
bad_messages_parsing_mode(kafka_cluster,
'dead_letter_queue',
'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka',
dead_letter_queue_test)
bad_messages_parsing_mode(
kafka_cluster,
"dead_letter_queue",
"CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka",
dead_letter_queue_test,
)
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
admin_client = KafkaAdminClient(