system_kafka_consumers: test with rebalance

This commit is contained in:
Ilya Golshtein 2023-06-12 21:29:23 +00:00
parent 957787d96a
commit 5e1c72a0d0

View File

@ -1186,6 +1186,7 @@ def test_kafka_consumer_hang2(kafka_cluster):
instance.query( instance.query(
""" """
DROP TABLE IF EXISTS test.kafka; DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka ENGINE = Kafka
@ -4580,22 +4581,89 @@ def test_system_kafka_consumers(kafka_cluster):
result = instance.query("SELECT * FROM test.kafka ORDER BY a;") result = instance.query("SELECT * FROM test.kafka ORDER BY a;")
expected = """\
1 foo
2 bar
42 answer
100 multi
101 row
103 message
"""
assert TSV(result) == TSV(expected)
result_system_kafka_consumers = instance.query("SELECT * FROM system.kafka_consumers format Vertical") result_system_kafka_consumers = instance.query("SELECT * FROM system.kafka_consumers WHERE database='test' and table='kafka' format Vertical")
logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}") logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
assert result_system_kafka_consumers == 'empty' assert result_system_kafka_consumers == 'empty'
kafka_delete_topic(admin_client, topic)
def test_system_kafka_consumers_rebalance(kafka_cluster):
# based on test_kafka_consumer_hang2
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
topic = "system_kafka_cons2"
kafka_create_topic(admin_client, topic)
# # Check that format_csv_delimiter parameter works now - as part of all available format settings.
# kafka_produce(
# kafka_cluster,
# topic,
# ["1|foo", "2|bar", "42|answer", "100|multi\n101|row\n103|message"],
# )
instance.query(
f"""
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_group_name = '{topic}',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_commit_on_select = 1,
kafka_group_name = '{topic}',
kafka_format = 'JSONEachRow';
"""
)
# first consumer subscribe the topic, try to poll some data, and go to rest
instance.query("SELECT * FROM test.kafka")
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query("SELECT * FROM test.kafka2")
instance.query("SELECT * FROM test.kafka")
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query("SELECT * FROM test.kafka2")
instance.query("SELECT * FROM test.kafka")
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query("SELECT * FROM test.kafka2")
result_system_kafka_consumers = instance.query("SELECT * FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical")
logging.debug(f"result_system_kafka_consumers (1): {result_system_kafka_consumers}")
# assert result_system_kafka_consumers == 'empty'
instance.query("DROP TABLE test.kafka")
instance.query("DROP TABLE test.kafka2")
kafka_delete_topic(admin_client, topic) kafka_delete_topic(admin_client, topic)
if __name__ == "__main__": if __name__ == "__main__":