#! /usr/bin/env python3 # -*- coding: utf-8 -*- # `pip install …` import kafka # … kafka-python import argparse from concurrent.futures import ThreadPoolExecutor import enum import multiprocessing import sys import time class Sync(enum.Enum): NONE = 'none' LEAD = 'leader' ALL = 'all' def __str__(self): return self.value def convert(self): values = { str(Sync.NONE): '0', str(Sync.LEAD): '1', str(Sync.ALL): 'all', } return values[self.value] def main(): parser = argparse.ArgumentParser(description='Produce a single message taken from input') parser.add_argument('--server', type=str, metavar='HOST', default='localhost', help='Kafka bootstrap-server address') parser.add_argument('--port', type=int, metavar='PORT', default=9092, help='Kafka bootstrap-server port') parser.add_argument('--client', type=str, default='ch-kafka-python', help='custom client id for this producer') parser.add_argument('--topic', type=str, required=True, help='name of Kafka topic to store in') parser.add_argument('--retries', type=int, default=0, help='number of retries to send on failure') parser.add_argument('--multiply', type=int, default=1, help='multiplies incoming string many times') parser.add_argument('--repeat', type=int, default=1, help='send same (multiplied) message many times') mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument('--jobs', type=int, default=multiprocessing.cpu_count(), help='number of concurrent jobs') mode_group.add_argument('--delay', type=int, metavar='SECONDS', default=0, help='delay before sending next message') args = parser.parse_args() config = { 'bootstrap_servers': f'{args.server}:{args.port}', 'client_id': args.client, 'retries': args.retries, } client = kafka.KafkaProducer(**config) message = sys.stdin.buffer.read() * args.multiply def send(num): if args.delay > 0: time.sleep(args.delay) client.send(topic=args.topic, value=message) print(f'iteration {num}: sent a message multiplied {args.multiply} times') if args.delay > 0: args.jobs = 1 pool = ThreadPoolExecutor(max_workers=args.jobs) for num in range(args.repeat): pool.submit(send, num) pool.shutdown() client.flush() client.close() return 0 if __name__ == "__main__": exit(main())