#! /usr/bin/env python3
# -*- coding: utf-8 -*-

# `pip install …`
import kafka  # … kafka-python

import argparse


def main():
    parser = argparse.ArgumentParser(
        description="Kafka client to get groups and topics status"
    )
    parser.add_argument(
        "--server",
        type=str,
        metavar="HOST",
        default="localhost",
        help="Kafka bootstrap-server address",
    )
    parser.add_argument(
        "--port",
        type=int,
        metavar="PORT",
        default=9092,
        help="Kafka bootstrap-server port",
    )
    parser.add_argument(
        "--client",
        type=str,
        default="ch-kafka-python",
        help="custom client id for this producer",
    )

    args = parser.parse_args()
    config = {
        "bootstrap_servers": f"{args.server}:{args.port}",
        "client_id": args.client,
    }

    client = kafka.KafkaAdminClient(**config)
    consumer = kafka.KafkaConsumer(**config)
    cluster = client._client.cluster

    topics = cluster.topics()
    for topic in topics:
        print(f'Topic "{topic}":', end="")
        for partition in cluster.partitions_for_topic(topic):
            tp = kafka.TopicPartition(topic, partition)
            print(
                f" {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})",
                end="",
            )
        print()

    groups = client.list_consumer_groups()
    for group in groups:
        print(f'Group "{group[0]}" ({group[1]}):')

        consumer = kafka.KafkaConsumer(**config, group_id=group[0])
        offsets = client.list_consumer_group_offsets(group[0])
        for topic, offset in offsets.items():
            print(
                f"\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}"
            )
        consumer.close()

    client.close()
    return 0


if __name__ == "__main__":
    exit(main())