ClickHouse/utils/kafka/consume.py
Ivan a502424c33
Implement support for insertion into Kafka tables (#6012)
* Add write-callback on each row for RowOutputStream
* Fix build of new rdkafka library
* Poll messages if Kafka outgoing queue is full
* Add test
* Add test producer-consumer
* Truncate delimiter from last row in message
2019-08-20 14:17:57 +03:00

41 lines
1.2 KiB
Python
Executable File

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from pprint import pprint
def main():
parser = argparse.ArgumentParser(description='Kafka Producer client')
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
help='Kafka bootstrap-server address')
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
help='Kafka bootstrap-server port')
parser.add_argument('--client', type=str, default='ch-kafka-python',
help='custom client id for this producer')
parser.add_argument('--topic', type=str, required=True,
help='name of Kafka topic to store in')
parser.add_argument('--group', type=str, required=True,
help='name of the consumer group')
args = parser.parse_args()
config = {
'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client,
'group_id': args.group,
'auto_offset_reset': 'earliest',
}
client = kafka.KafkaConsumer(**config)
client.subscribe([args.topic])
pprint(client.poll(10000))
client.unsubscribe()
client.close()
if __name__ == "__main__":
exit(main())