add failing test with kafka

This commit is contained in:
Anton Popov 2020-07-14 12:22:25 +03:00
parent 3314a97b99
commit 890bb2f582

View File

@ -2075,6 +2075,40 @@ def test_premature_flush_on_eof(kafka_cluster):
DROP TABLE test.destination;
''')
@pytest.mark.timeout(120)
def test_kafka_unavailable(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)]
kafka_produce('test_bad_reschedule', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:11111',
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
SELECT
key,
now() as consume_ts,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
instance.query("SELECT * FROM test.kafka")
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")