ClickHouse/utils/kafka/manage.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

67 lines
1.5 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
2024-09-27 10:19:39 +00:00
import argparse
# `pip install …`
import kafka # … kafka-python
def main():
parser = argparse.ArgumentParser(description="Kafka Topic manager")
parser.add_argument(
"--server",
type=str,
metavar="HOST",
default="localhost",
help="Kafka bootstrap-server address",
)
parser.add_argument(
"--port",
type=int,
metavar="PORT",
default=9092,
help="Kafka bootstrap-server port",
)
parser.add_argument(
"--client",
type=str,
default="ch-kafka-python",
help="custom client id for this producer",
)
commands = parser.add_mutually_exclusive_group()
commands.add_argument(
"--create",
type=str,
metavar="TOPIC",
nargs="+",
help="create new topic(s) in the cluster",
)
commands.add_argument(
"--delete",
type=str,
metavar="TOPIC",
nargs="+",
help="delete existing topic(s) from the cluster",
)
args = parser.parse_args()
config = {
"bootstrap_servers": f"{args.server}:{args.port}",
"client_id": args.client,
}
client = kafka.KafkaAdminClient(**config)
if args.create:
2020-10-02 16:54:07 +00:00
print((client.create_topics(args.create)))
elif args.delete:
2020-10-02 16:54:07 +00:00
print((client.delete_topics(args.delete)))
client.close()
2019-11-01 16:09:35 +00:00
return 0
if __name__ == "__main__":
exit(main())