mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Fix for test_kafka_flush_by_block_size after rdkafka 1.5
Since rdkafka 1.5 (https://github.com/edenhill/librdkafka/releases/tag/v1.5.0) don't auto-create topics when subscribing, so test start failing because it was intentionally time-limited to check if flushing by block size worked. Since 20.5 it's possible to increase kafka_flush_interval_ms per table, so i've adjusted the test using that feature to test if data is flushed by block size, not by time.
This commit is contained in:
parent
cf49a83980
commit
991c673870
@ -1507,8 +1507,19 @@ def test_kafka_flush_by_time(kafka_cluster):
|
||||
assert TSV(result) == TSV('1 1')
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.timeout(90)
|
||||
def test_kafka_flush_by_block_size(kafka_cluster):
|
||||
cancel = threading.Event()
|
||||
|
||||
def produce():
|
||||
while not cancel.is_set():
|
||||
messages = []
|
||||
messages.append(json.dumps({'key': 0, 'value': 0}))
|
||||
kafka_produce('flush_by_block_size', messages)
|
||||
|
||||
kafka_thread = threading.Thread(target=produce)
|
||||
kafka_thread.start()
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.consumer;
|
||||
@ -1520,10 +1531,10 @@ def test_kafka_flush_by_block_size(kafka_cluster):
|
||||
kafka_group_name = 'flush_by_block_size',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_max_block_size = 100,
|
||||
kafka_poll_max_batch_size = 1,
|
||||
kafka_flush_interval_ms = 120000, /* should not flush by time during test */
|
||||
kafka_row_delimiter = '\\n';
|
||||
|
||||
SELECT * FROM test.kafka;
|
||||
|
||||
CREATE TABLE test.view (key UInt64, value UInt64)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY key;
|
||||
@ -1532,31 +1543,26 @@ def test_kafka_flush_by_block_size(kafka_cluster):
|
||||
SELECT * FROM test.kafka;
|
||||
''')
|
||||
|
||||
messages = []
|
||||
for _ in range(101):
|
||||
messages.append(json.dumps({'key': 0, 'value': 0}))
|
||||
kafka_produce('flush_by_block_size', messages)
|
||||
|
||||
# Wait for Kafka engine to consume this data
|
||||
while 1 != int(instance.query("SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'")):
|
||||
time.sleep(1)
|
||||
time.sleep(0.5)
|
||||
|
||||
# TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216
|
||||
# second flush happens earlier than expected, so we have 2 parts here instead of one
|
||||
# flush by block size works correctly, so the feature checked by the test is working correctly
|
||||
cancel.set()
|
||||
kafka_thread.join()
|
||||
|
||||
# more flushes can happens during test, we need to check only result of first flush (part named all_1_1_0).
|
||||
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
|
||||
# print(result)
|
||||
|
||||
# kafka_cluster.open_bash_shell('instance')
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
|
||||
|
||||
# 100 = first poll should return 100 messages (and rows)
|
||||
# not waiting for stream_flush_interval_ms
|
||||
assert int(result) == 100, 'Messages from kafka should be flushed at least every stream_flush_interval_ms!'
|
||||
assert int(result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!'
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
|
Loading…
Reference in New Issue
Block a user