mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
a502424c33
* Add write-callback on each row for RowOutputStream * Fix build of new rdkafka library * Poll messages if Kafka outgoing queue is full * Add test * Add test producer-consumer * Truncate delimiter from last row in message
41 lines
1.2 KiB
Python
Executable File
41 lines
1.2 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# `pip install …`
|
|
import kafka # … kafka-python
|
|
|
|
import argparse
|
|
from pprint import pprint
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Kafka Producer client')
|
|
parser.add_argument('--server', type=str, metavar='HOST', default='localhost',
|
|
help='Kafka bootstrap-server address')
|
|
parser.add_argument('--port', type=int, metavar='PORT', default=9092,
|
|
help='Kafka bootstrap-server port')
|
|
parser.add_argument('--client', type=str, default='ch-kafka-python',
|
|
help='custom client id for this producer')
|
|
parser.add_argument('--topic', type=str, required=True,
|
|
help='name of Kafka topic to store in')
|
|
parser.add_argument('--group', type=str, required=True,
|
|
help='name of the consumer group')
|
|
|
|
args = parser.parse_args()
|
|
config = {
|
|
'bootstrap_servers': f'{args.server}:{args.port}',
|
|
'client_id': args.client,
|
|
'group_id': args.group,
|
|
'auto_offset_reset': 'earliest',
|
|
}
|
|
client = kafka.KafkaConsumer(**config)
|
|
|
|
client.subscribe([args.topic])
|
|
pprint(client.poll(10000))
|
|
client.unsubscribe()
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|