ClickHouse/utils/kafka/status.py

53 lines
1.8 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from pprint import pprint
def main():
parser = argparse.ArgumentParser(description='Kafka client to get groups and topics status')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
}
client = kafka.KafkaAdminClient(**config)
consumer = kafka.KafkaConsumer(**config)
cluster = client._client.cluster
topics = cluster.topics()
for topic in topics:
print(f'Topic "{topic}":', end='')
for partition in cluster.partitions_for_topic(topic):
tp = kafka.TopicPartition(topic, partition)
print(f' {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})', end='')
print()
groups = client.list_consumer_groups()
for group in groups:
print(f'Group "{group[0]}" ({group[1]}):')
consumer = kafka.KafkaConsumer(**config, group_id=group[0])
offsets = client.list_consumer_group_offsets(group[0])
for topic, offset in offsets.items():
print(f'\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}')
consumer.close()
client.close()
if __name__ == "__main__":
exit(main())