try to address python kafka client disconnects better

This commit is contained in:
Mikhail Filimonov 2021-02-26 07:17:31 +01:00
parent 60ede36068
commit 9e3547681b
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE

View File

@ -18,10 +18,8 @@ from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
"""
protoc --version
@ -83,12 +81,16 @@ def wait_kafka_is_available(max_retries=50):
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=producer_serializer)
def kafka_produce(topic, messages, timestamp=None, retries=2):
producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=producer_serializer, retries=retries, max_in_flight_requests_per_connection=1)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
## just to ensure the python client / producer is working properly
def kafka_producer_send_heartbeat_msg(max_retries=50):
kafka_produce('test_heartbeat_topic', ['test'], retries=max_retries)
def kafka_consume(topic):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest")
consumer.subscribe(topics=(topic))
@ -166,35 +168,22 @@ def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'
else:
return TSV(result) == TSV(reference)
# https://stackoverflow.com/a/57692111/1555175
def describe_consumer_group(name):
client = BrokerConnection('localhost', 9092, socket.AF_INET)
client.connect_blocking()
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
future = client.send(list_members_in_groups)
while not future.is_done:
for resp, f in client.recv():
f.success(resp)
(error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
consumer_groups = admin_client.describe_consumer_groups([name])
res = []
for member in members:
(member_id, client_id, client_host, member_metadata, member_assignment) = member
for member in consumer_groups[0].members:
member_info = {}
member_info['member_id'] = member_id
member_info['client_id'] = client_id
member_info['client_host'] = client_host
member_info['member_id'] = member.member_id
member_info['client_id'] = member.client_id
member_info['client_host'] = member.client_host
member_topics_assignment = []
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
for (topic, partitions) in member.member_assignment.assignment:
member_topics_assignment.append({'topic': topic, 'partitions': partitions})
member_info['assignment'] = member_topics_assignment
res.append(member_info)
return res
# Fixtures
@pytest.fixture(scope="module")
@ -209,15 +198,14 @@ def kafka_cluster():
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
wait_kafka_is_available()
wait_kafka_is_available() # ensure kafka is alive
kafka_producer_send_heartbeat_msg() # ensure python kafka client is ok
# print("kafka is available - running test")
yield # run test
# Tests
@pytest.mark.timeout(180)