Integration tests: introduce wait_for_log_line.

It uses tail -f executed in container, this way i don't need to pull
the file many times, rememeber positions etc.
This commit is contained in:
Mikhail Filimonov 2021-02-23 17:53:14 +01:00
parent 32fa3dbc99
commit a4c9e62d6d
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
2 changed files with 13 additions and 13 deletions

View File

@ -13,6 +13,7 @@ import subprocess
import time import time
import traceback import traceback
import urllib.parse import urllib.parse
import shlex
import cassandra.cluster import cassandra.cluster
import docker import docker
@ -1081,6 +1082,13 @@ class ClickHouseInstance:
["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)]) ["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)])
return len(result) > 0 return len(result) > 0
def wait_for_log_line(self, regexp, filename='/var/log/clickhouse-server/clickhouse-server.log', timeout=30, repetitions=1, look_behind_lines=100):
start_time = time.time()
result = self.exec_in_container(
["bash", "-c", 'timeout {} tail -Fn{} "{}" | grep -Eqm {} {}'.format(timeout, look_behind_lines, filename, repetitions, shlex.quote(regexp))])
current_time = time.time()
print('Log line matching "{}" appeared in a {} seconds'.format(regexp, current_time - start_time))
def file_exists(self, path): def file_exists(self, path):
return self.exec_in_container( return self.exec_in_container(
["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n' ["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n'

View File

@ -2042,7 +2042,7 @@ def test_kafka_rebalance(kafka_cluster):
assert result == 1, 'Messages from kafka get duplicated!' assert result == 1, 'Messages from kafka get duplicated!'
@pytest.mark.timeout(1200) @pytest.mark.timeout(120)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)] messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(1)]
kafka_produce('no_holes_when_write_suffix_failed', messages) kafka_produce('no_holes_when_write_suffix_failed', messages)
@ -2076,23 +2076,15 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
CREATE MATERIALIZED VIEW test.consumer TO test.view AS CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka SELECT * FROM test.kafka
WHERE NOT sleepEachRow(1); WHERE NOT sleepEachRow(0.1);
''') ''')
# the tricky part here is that disconnect should happen after write prefix, but before write suffix # the tricky part here is that disconnect should happen after write prefix, but before write suffix
# so i use sleepEachRow instance.wait_for_log_line("Polled batch of 20 messages")
time.sleep(3)
pm.drop_instance_zk_connections(instance) pm.drop_instance_zk_connections(instance)
time.sleep(20) instance.wait_for_log_line("Coordination.*while write prefix to view")
pm.heal_all() pm.heal_all()
instance.wait_for_log_line("Committed offset 23")
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(45)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("ZooKeeper session has been expired.: while write prefix to view")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view') result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result) print(result)