Merge pull request #18000 from ClickHouse/filimonov-kafka-flaky-test

kafka test_premature_flush_on_eof flap
This commit is contained in:
alexey-milovidov 2020-12-12 02:29:26 +03:00 committed by GitHub
commit 656a9222ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2295,6 +2295,11 @@ def test_premature_flush_on_eof(kafka_cluster):
ORDER BY key;
''')
# messages created here will be consumed immedeately after MV creation
# reaching topic EOF.
# But we should not do flush immedeately after reaching EOF, because
# next poll can return more data, and we should respect kafka_flush_interval_ms
# and try to form bigger block
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(1)]
kafka_produce('premature_flush_on_eof', messages)
@ -2313,12 +2318,18 @@ def test_premature_flush_on_eof(kafka_cluster):
# all subscriptions/assignments done during select, so it start sending data to test.destination
# immediately after creation of MV
time.sleep(2)
time.sleep(1.5) # that sleep is needed to ensure that first poll finished, and at least one 'empty' polls happened.
# Empty poll before the fix were leading to premature flush.
# TODO: wait for messages in log: "Polled batch of 1 messages", followed by "Stalled"
# produce more messages after delay
kafka_produce('premature_flush_on_eof', messages)
# data was not flushed yet (it will be flushed 7.5 sec after creating MV)
assert int(instance.query("SELECT count() FROM test.destination")) == 0
time.sleep(6)
time.sleep(9) # TODO: wait for messages in log: "Committed offset ..."
# it should be single part, i.e. single insert
result = instance.query('SELECT _part, count() FROM test.destination group by _part')