fix flap in no_holes_when_write_suffix_failed

This commit is contained in:
Mikhail Filimonov 2020-10-08 15:35:44 +02:00
parent 3ac7b5aba8
commit 5fb549ca24
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
2 changed files with 21 additions and 8 deletions

View File

@ -19,6 +19,7 @@ class PartitionManager:
def __init__(self):
self._iptables_rules = []
_NetworkManager.get()
def drop_instance_zk_connections(self, instance, action='DROP'):
self._check_instance(instance)

View File

@ -1810,7 +1810,7 @@ def test_kafka_rebalance(kafka_cluster):
@pytest.mark.timeout(1200)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
instance.query('''
@ -1823,8 +1823,19 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
kafka_topic_list = 'no_holes_when_write_suffix_failed',
kafka_group_name = 'no_holes_when_write_suffix_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
kafka_max_block_size = 20,
kafka_flush_interval_ms = 2000;
SELECT * FROM test.kafka LIMIT 1; /* do subscription & assignment in advance (it can take different time, test rely on timing, so can flap otherwise) */
''')
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('no_holes_when_write_suffix_failed', messages)
# init PartitionManager (it starts container) earlier
pm = PartitionManager()
instance.query('''
CREATE TABLE test.view (key UInt64, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/kafkatest/tables/no_holes_when_write_suffix_failed', 'node1')
ORDER BY key;
@ -1833,17 +1844,18 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(1);
''')
# the tricky part here is that disconnect should happen after write prefix, but before write suffix
# so i use sleepEachRow
with PartitionManager() as pm:
time.sleep(12)
time.sleep(3)
pm.drop_instance_zk_connections(instance)
time.sleep(20)
pm.heal_all
pm.heal_all()
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(90)
time.sleep(45)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view")