mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
53 lines
1.8 KiB
Python
Executable File
53 lines
1.8 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# `pip install …`
|
|
import kafka # … kafka-python
|
|
|
|
import argparse
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Kafka client to get groups and topics status')
|
|
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
|
|
help='Kafka bootstrap-server address')
|
|
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
|
|
help='Kafka bootstrap-server port')
|
|
parser.add_argument('--client', type=str, default='ch-kafka-python',
|
|
help='custom client id for this producer')
|
|
|
|
args = parser.parse_args()
|
|
config = {
|
|
'bootstrap_servers': f'{args.server}:{args.port}',
|
|
'client_id': args.client,
|
|
}
|
|
|
|
client = kafka.KafkaAdminClient(**config)
|
|
consumer = kafka.KafkaConsumer(**config)
|
|
cluster = client._client.cluster
|
|
|
|
topics = cluster.topics()
|
|
for topic in topics:
|
|
print(f'Topic "{topic}":', end='')
|
|
for partition in cluster.partitions_for_topic(topic):
|
|
tp = kafka.TopicPartition(topic, partition)
|
|
print(f' {partition} (begin: {consumer.beginning_offsets([tp])[tp]}, end: {consumer.end_offsets([tp])[tp]})', end='')
|
|
print()
|
|
|
|
groups = client.list_consumer_groups()
|
|
for group in groups:
|
|
print(f'Group "{group[0]}" ({group[1]}):')
|
|
|
|
consumer = kafka.KafkaConsumer(**config, group_id=group[0])
|
|
offsets = client.list_consumer_group_offsets(group[0])
|
|
for topic, offset in offsets.items():
|
|
print(f'\t{topic.topic}[{topic.partition}]: {consumer.beginning_offsets([topic])[topic]}, {offset.offset}, {consumer.end_offsets([topic])[topic]}')
|
|
consumer.close()
|
|
|
|
client.close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|