2019-04-18 14:36:51 +00:00
|
|
|
#! /usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
# `pip install …`
|
|
|
|
import kafka # … kafka-python
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
2022-03-22 16:39:58 +00:00
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
description="Kafka client to get groups and topics status"
|
|
|
|
)
|
|
|
|
parser.add_argument(
|
|
|
|
"--server",
|
|
|
|
type=str,
|
|
|
|
metavar="HOST",
|
|
|
|
default="localhost",
|
|
|
|
help="Kafka bootstrap-server address",
|
|
|
|
)
|
|
|
|
parser.add_argument(
|
|
|
|
"--port",
|
|
|
|
type=int,
|
|
|
|
metavar="PORT",
|
|
|
|
default=9092,
|
|
|
|
help="Kafka bootstrap-server port",
|
|
|
|
)
|
|
|
|
parser.add_argument(
|
|
|
|
"--client",
|
|
|
|
type=str,
|
|
|
|
default="ch-kafka-python",
|
|
|
|
help="custom client id for this producer",
|
|
|
|
)
|
2019-04-18 14:36:51 +00:00
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
config = {
|
2022-03-22 16:39:58 +00:00
|
|
|
"bootstrap_servers": f"{args.server}:{args.port}",
|
|
|
|
"client_id": args.client,
|
2019-04-18 14:36:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
client = kafka.KafkaAdminClient(**config)
|
|
|
|
consumer = kafka.KafkaConsumer(**config)
|
|
|
|
cluster = client._client.cluster
|
|
|
|
|
|
|
|
topics = cluster.topics()
|
|
|
|
for topic in topics:
|
2022-03-22 16:39:58 +00:00
|
|
|
print(f'Topic "{topic}":', end="")
|
2019-04-18 14:36:51 +00:00
|
|
|
for partition in cluster.partitions_for_topic(topic):
|
|
|
|
tp = kafka.TopicPartition(topic, partition)
|
2022-03-22 16:39:58 +00:00
|
|
|
print(
|
|
|
|
f" {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})",
|
|
|
|
end="",
|
|
|
|
)
|
2019-04-18 14:36:51 +00:00
|
|
|
print()
|
|
|
|
|
|
|
|
groups = client.list_consumer_groups()
|
|
|
|
for group in groups:
|
|
|
|
print(f'Group "{group[0]}" ({group[1]}):')
|
|
|
|
|
|
|
|
consumer = kafka.KafkaConsumer(**config, group_id=group[0])
|
|
|
|
offsets = client.list_consumer_group_offsets(group[0])
|
|
|
|
for topic, offset in offsets.items():
|
2022-03-22 16:39:58 +00:00
|
|
|
print(
|
|
|
|
f"\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}"
|
|
|
|
)
|
2019-04-18 14:36:51 +00:00
|
|
|
consumer.close()
|
|
|
|
|
|
|
|
client.close()
|
2019-11-01 16:10:50 +00:00
|
|
|
return 0
|
2019-04-18 14:36:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
exit(main())
|