ClickHouse/utils/kafka/status.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

74 lines
2.0 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
2024-09-27 10:19:39 +00:00
import argparse
# `pip install …`
import kafka # … kafka-python
def main():
parser = argparse.ArgumentParser(
description="Kafka client to get groups and topics status"
)
parser.add_argument(
"--server",
type=str,
metavar="HOST",
default="localhost",
help="Kafka bootstrap-server address",
)
parser.add_argument(
"--port",
type=int,
metavar="PORT",
default=9092,
help="Kafka bootstrap-server port",
)
parser.add_argument(
"--client",
type=str,
default="ch-kafka-python",
help="custom client id for this producer",
)
args = parser.parse_args()
config = {
"bootstrap_servers": f"{args.server}:{args.port}",
"client_id": args.client,
}
client = kafka.KafkaAdminClient(**config)
consumer = kafka.KafkaConsumer(**config)
cluster = client._client.cluster
topics = cluster.topics()
for topic in topics:
print(f'Topic "{topic}":', end="")
for partition in cluster.partitions_for_topic(topic):
tp = kafka.TopicPartition(topic, partition)
print(
f" {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})",
end="",
)
print()
groups = client.list_consumer_groups()
for group in groups:
print(f'Group "{group[0]}" ({group[1]}):')
consumer = kafka.KafkaConsumer(**config, group_id=group[0])
offsets = client.list_consumer_group_offsets(group[0])
for topic, offset in offsets.items():
print(
f"\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}"
)
consumer.close()
client.close()
2019-11-01 16:10:50 +00:00
return 0
if __name__ == "__main__":
exit(main())