system_kafka_consumers: black formatter

This commit is contained in:
Ilya Golshtein 2023-08-01 15:57:06 +00:00
parent 978d8bf9a6
commit a38459b407

View File

@ -90,7 +90,9 @@ def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15, partition=None):
def kafka_produce(
kafka_cluster, topic, messages, timestamp=None, retries=15, partition=None
):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic
@ -100,7 +102,9 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15, pa
kafka_cluster.kafka_port, producer_serializer, retries
)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp, partition=partition)
producer.send(
topic=topic, value=message, timestamp_ms=timestamp, partition=partition
)
producer.flush()
@ -232,7 +236,9 @@ message Message {
SELECT last_exception, last_exception_time, database, table FROM system.kafka_consumers
"""
)
logging.debug(f"result_system_kafka_consumers (test_bad_messages_parsing 2): {result_system_kafka_consumers}")
logging.debug(
f"result_system_kafka_consumers (test_bad_messages_parsing 2): {result_system_kafka_consumers}"
)
kafka_delete_topic(admin_client, f"{format_name}_err")
@ -285,6 +291,7 @@ struct Message
kafka_delete_topic(admin_client, "CapnProto_err")
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
@ -333,7 +340,7 @@ Cannot parse input: expected \\'{\\' before: \\'asdfghjkl\\': while parsing Kafk
SELECT exceptions.text[1], length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
"""
)
result_system_kafka_consumers = result_system_kafka_consumers.replace('\t', '|')
result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|")
if result_system_kafka_consumers == expected_result or retries > max_retries:
break
retries += 1
@ -347,6 +354,7 @@ Cannot parse input: expected \\'{\\' before: \\'asdfghjkl\\': while parsing Kafk
]:
kafka_delete_topic(admin_client, f"{format_name}_err")
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")