ClickHouse/utils/kafka/produce.py
2022-03-22 17:39:58 +01:00

117 lines
2.7 KiB
Python
Executable File

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# `pip install …`
import kafka # … kafka-python
import argparse
from concurrent.futures import ThreadPoolExecutor
import enum
import multiprocessing
import sys
import time
class Sync(enum.Enum):
NONE = "none"
LEAD = "leader"
ALL = "all"
def __str__(self):
return self.value
def convert(self):
values = {
str(Sync.NONE): "0",
str(Sync.LEAD): "1",
str(Sync.ALL): "all",
}
return values[self.value]
def main():
parser = argparse.ArgumentParser(
description="Produce a single message taken from input"
)
parser.add_argument(
"--server",
type=str,
metavar="HOST",
default="localhost",
help="Kafka bootstrap-server address",
)
parser.add_argument(
"--port",
type=int,
metavar="PORT",
default=9092,
help="Kafka bootstrap-server port",
)
parser.add_argument(
"--client",
type=str,
default="ch-kafka-python",
help="custom client id for this producer",
)
parser.add_argument(
"--topic", type=str, required=True, help="name of Kafka topic to store in"
)
parser.add_argument(
"--retries", type=int, default=0, help="number of retries to send on failure"
)
parser.add_argument(
"--multiply", type=int, default=1, help="multiplies incoming string many times"
)
parser.add_argument(
"--repeat",
type=int,
default=1,
help="send same (multiplied) message many times",
)
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument(
"--jobs",
type=int,
default=multiprocessing.cpu_count(),
help="number of concurrent jobs",
)
mode_group.add_argument(
"--delay",
type=int,
metavar="SECONDS",
default=0,
help="delay before sending next message",
)
args = parser.parse_args()
config = {
"bootstrap_servers": f"{args.server}:{args.port}",
"client_id": args.client,
"retries": args.retries,
}
client = kafka.KafkaProducer(**config)
message = sys.stdin.buffer.read() * args.multiply
def send(num):
if args.delay > 0:
time.sleep(args.delay)
client.send(topic=args.topic, value=message)
print(f"iteration {num}: sent a message multiplied {args.multiply} times")
if args.delay > 0:
args.jobs = 1
pool = ThreadPoolExecutor(max_workers=args.jobs)
for num in range(args.repeat):
pool.submit(send, num)
pool.shutdown()
client.flush()
client.close()
return 0
if __name__ == "__main__":
exit(main())