diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index fe917775591..4c2d8e5c29d 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2075,6 +2075,40 @@ def test_premature_flush_on_eof(kafka_cluster): DROP TABLE test.destination; ''') + +@pytest.mark.timeout(120) +def test_kafka_unavailable(kafka_cluster): + messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)] + kafka_produce('test_bad_reschedule', messages) + + instance.query(''' + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:11111', + kafka_topic_list = 'test_bad_reschedule', + kafka_group_name = 'test_bad_reschedule', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 1000; + + CREATE MATERIALIZED VIEW test.destination Engine=Log AS + SELECT + key, + now() as consume_ts, + value, + _topic, + _key, + _offset, + _partition, + _timestamp + FROM test.kafka; + ''') + + instance.query("SELECT * FROM test.kafka") + + while int(instance.query("SELECT count() FROM test.destination")) < 20000: + print("Waiting for consume") + time.sleep(1) + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")