#! /usr/bin/env python3 # -*- coding: utf-8 -*- import argparse # `pip install …` import kafka # … kafka-python def main(): parser = argparse.ArgumentParser( description="Kafka client to get groups and topics status" ) parser.add_argument( "--server", type=str, metavar="HOST", default="localhost", help="Kafka bootstrap-server address", ) parser.add_argument( "--port", type=int, metavar="PORT", default=9092, help="Kafka bootstrap-server port", ) parser.add_argument( "--client", type=str, default="ch-kafka-python", help="custom client id for this producer", ) args = parser.parse_args() config = { "bootstrap_servers": f"{args.server}:{args.port}", "client_id": args.client, } client = kafka.KafkaAdminClient(**config) consumer = kafka.KafkaConsumer(**config) cluster = client._client.cluster topics = cluster.topics() for topic in topics: print(f'Topic "{topic}":', end="") for partition in cluster.partitions_for_topic(topic): tp = kafka.TopicPartition(topic, partition) print( f" {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})", end="", ) print() groups = client.list_consumer_groups() for group in groups: print(f'Group "{group[0]}" ({group[1]}):') consumer = kafka.KafkaConsumer(**config, group_id=group[0]) offsets = client.list_consumer_group_offsets(group[0]) for topic, offset in offsets.items(): print( f"\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}" ) consumer.close() client.close() return 0 if __name__ == "__main__": exit(main())