Merge remote-tracking branch 'upstream/pr/12487' into kafka-unavail-broker-SIGSEGV

* upstream/pr/12487:
  add failing test with kafka
This commit is contained in:
Azat Khuzhin 2020-07-22 11:07:06 +03:00
commit 58ba72c7b6

View File

@ -2075,6 +2075,40 @@ def test_premature_flush_on_eof(kafka_cluster):
DROP TABLE test.destination; DROP TABLE test.destination;
''') ''')
@pytest.mark.timeout(120)
def test_kafka_unavailable(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)]
kafka_produce('test_bad_reschedule', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:11111',
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
SELECT
key,
now() as consume_ts,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
instance.query("SELECT * FROM test.kafka")
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
raw_input("Cluster created, press any key to destroy...") raw_input("Cluster created, press any key to destroy...")