ClickHouse/utils/kafka/manage.py

43 lines
1.3 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
def main():
parser = argparse.ArgumentParser(description='Kafka Topic manager')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
commands = parser.add_mutually_exclusive_group()
commands.add_argument('--create', type=str, metavar='TOPIC', nargs='+',
help='create new topic(s) in the cluster')
commands.add_argument('--delete', type=str, metavar='TOPIC', nargs='+',
help='delete existing topic(s) from the cluster')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
}
client = kafka.KafkaAdminClient(**config)
if args.create:
2020-10-02 16:54:07 +00:00
print((client.create_topics(args.create)))
elif args.delete:
2020-10-02 16:54:07 +00:00
print((client.delete_topics(args.delete)))
client.close()
2019-11-01 16:09:35 +00:00
return 0
if __name__ == "__main__":
exit(main())