mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
58 lines
1.3 KiB
Python
Executable File
58 lines
1.3 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
from pprint import pprint
|
|
|
|
# `pip install …`
|
|
import kafka # … kafka-python
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Kafka Producer client")
|
|
parser.add_argument(
|
|
"--server",
|
|
type=str,
|
|
metavar="HOST",
|
|
default="localhost",
|
|
help="Kafka bootstrap-server address",
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
type=int,
|
|
metavar="PORT",
|
|
default=9092,
|
|
help="Kafka bootstrap-server port",
|
|
)
|
|
parser.add_argument(
|
|
"--client",
|
|
type=str,
|
|
default="ch-kafka-python",
|
|
help="custom client id for this producer",
|
|
)
|
|
parser.add_argument(
|
|
"--topic", type=str, required=True, help="name of Kafka topic to store in"
|
|
)
|
|
parser.add_argument(
|
|
"--group", type=str, required=True, help="name of the consumer group"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
config = {
|
|
"bootstrap_servers": f"{args.server}:{args.port}",
|
|
"client_id": args.client,
|
|
"group_id": args.group,
|
|
"auto_offset_reset": "earliest",
|
|
}
|
|
client = kafka.KafkaConsumer(**config)
|
|
|
|
client.subscribe([args.topic])
|
|
pprint(client.poll(10000))
|
|
client.unsubscribe()
|
|
client.close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|