Remove most of sleeps in tests

This commit is contained in:
Mikhail Filimonov 2021-02-25 17:01:35 +01:00
parent 6c6eaf2a60
commit 2bcfff257c
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
2 changed files with 91 additions and 82 deletions

View File

@ -1089,13 +1089,13 @@ class ClickHouseInstance:
# if repetitions>1 grep will return success even if not enough lines were collected,
if repetitions>1 and len(result.splitlines()) < repetitions:
print("wait_for_log_line: those lines were founded during {} sec.".format(timeout))
print("wait_for_log_line: those lines were found during {} seconds:".format(timeout))
print(result)
raise Exception("wait_for_log_line: Not enough repetitions: {} found, while {} expected".format(len(result.splitlines()), repetitions))
wait_duration = time.time() - start_time
print('Log line matching "{}" appeared in a {} seconds'.format(regexp, wait_duration))
print('{} log line matching "{}" appeared in a {} seconds'.format(repetitions, regexp, wait_duration))
return wait_duration

View File

@ -796,6 +796,12 @@ def test_kafka_issue4116(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_consumer_hang(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="consumer_hang", num_partitions=8, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.view;
@ -807,20 +813,18 @@ def test_kafka_consumer_hang(kafka_cluster):
kafka_topic_list = 'consumer_hang',
kafka_group_name = 'consumer_hang',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8,
kafka_row_delimiter = '\\n';
kafka_num_consumers = 8;
CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory();
CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka;
''')
time.sleep(10)
instance.query('SELECT * FROM test.view')
instance.wait_for_log_line('kafka.*Stalled', repetitions=20)
# This should trigger heartbeat fail,
# which will trigger REBALANCE_IN_PROGRESS,
# and which can lead to consumer hang.
kafka_cluster.pause_container('kafka1')
time.sleep(0.5)
instance.wait_for_log_line('heartbeat error')
kafka_cluster.unpause_container('kafka1')
# print("Attempt to drop")
@ -844,6 +848,12 @@ def test_kafka_consumer_hang(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_consumer_hang2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="consumer_hang2", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.kafka;
@ -884,23 +894,22 @@ def test_kafka_consumer_hang2(kafka_cluster):
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
@pytest.mark.timeout(120)
def test_kafka_csv_with_delimiter(kafka_cluster):
messages = []
for i in range(50):
messages.append('{i}, {i}'.format(i=i))
kafka_produce('csv', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n';
kafka_format = 'CSV';
''')
messages = []
for i in range(50):
messages.append('{i}, {i}'.format(i=i))
kafka_produce('csv', messages)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
@ -910,23 +919,22 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(120)
def test_kafka_tsv_with_delimiter(kafka_cluster):
messages = []
for i in range(50):
messages.append('{i}\t{i}'.format(i=i))
kafka_produce('tsv', messages)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
kafka_format = 'TSV';
''')
messages = []
for i in range(50):
messages.append('{i}\t{i}'.format(i=i))
kafka_produce('tsv', messages)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
@ -936,8 +944,13 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(120)
def test_kafka_select_empty(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []
topic_list.append(NewTopic(name="empty", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
CREATE TABLE test.kafka (key UInt64)
ENGINE = Kafka
@ -953,15 +966,6 @@ def test_kafka_select_empty(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
''')
messages = ''
for i in range(25):
messages += json.dumps({'key': i, 'value': i}) + '\n'
@ -972,6 +976,15 @@ def test_kafka_json_without_delimiter(kafka_cluster):
messages += json.dumps({'key': i, 'value': i}) + '\n'
kafka_produce('json', [messages])
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow';
''')
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
@ -983,6 +996,10 @@ def test_kafka_json_without_delimiter(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_protobuf(kafka_cluster):
kafka_produce_protobuf_messages('pb', 0, 20)
kafka_produce_protobuf_messages('pb', 20, 1)
kafka_produce_protobuf_messages('pb', 21, 29)
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
@ -993,10 +1010,6 @@ def test_kafka_protobuf(kafka_cluster):
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
kafka_produce_protobuf_messages('pb', 20, 1)
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
@ -1009,6 +1022,9 @@ def test_kafka_protobuf(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_string_field_on_first_position_in_protobuf(kafka_cluster):
# https://github.com/ClickHouse/ClickHouse/issues/12615
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 0, 20)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 20, 1)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 21, 29)
instance.query('''
CREATE TABLE test.kafka (
@ -1021,14 +1037,8 @@ SETTINGS
kafka_group_name = 'string_field_on_first_position_in_protobuf',
kafka_format = 'Protobuf',
kafka_schema = 'social:User';
SELECT * FROM test.kafka;
''')
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 0, 20)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 20, 1)
kafka_produce_protobuf_social('string_field_on_first_position_in_protobuf', 21, 29)
result = instance.query('SELECT * FROM test.kafka', ignore_error=True)
expected = '''\
John Doe 0 1000000
@ -1898,7 +1908,7 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
messages.append("\n".join(rows))
kafka_produce('topic_with_multiple_partitions2', messages)
instance.wait_for_log_line('kafka.*Stalled', repetitions=20)
instance.wait_for_log_line('kafka.*Stalled', repetitions=5)
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
@ -2136,7 +2146,7 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
instance.query('''
DROP TABLE IF EXISTS test.destination;
DROP TABLE IF EXISTS test.destination SYNC;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
@ -2156,7 +2166,8 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
kafka_max_block_size = 1000,
kafka_flush_interval_ms = 1000;
CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS
SELECT
@ -2170,9 +2181,8 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) == 0:
print("Waiting for test.kafka_consumer to start consume")
time.sleep(1)
# Waiting for test.kafka_consumer to start consume
instance.wait_for_log_line('Committed offset [0-9]+')
cancel = threading.Event()
@ -2185,14 +2195,14 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
time.sleep(1)
time.sleep(0.5)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
time.sleep(12)
time.sleep(4)
instance.query('''
DROP TABLE test.kafka;
DROP TABLE test.kafka SYNC;
''')
instance.query('''
@ -2202,11 +2212,12 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10000;
kafka_max_block_size = 10000,
kafka_flush_interval_ms = 1000;
''')
cancel.set()
time.sleep(15)
instance.wait_for_log_line('kafka.*Stalled', repetitions=5)
# kafka_cluster.open_bash_shell('instance')
# SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key;
@ -2215,8 +2226,8 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
print(result)
instance.query('''
DROP TABLE test.kafka_consumer;
DROP TABLE test.destination;
DROP TABLE test.kafka_consumer SYNC;
DROP TABLE test.destination SYNC;
''')
kafka_thread.join()
@ -2235,7 +2246,8 @@ def test_bad_reschedule(kafka_cluster):
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
kafka_max_block_size = 1000,
kafka_flush_interval_ms = 1000;
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
SELECT
@ -2250,9 +2262,7 @@ def test_bad_reschedule(kafka_cluster):
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
instance.wait_for_log_line("Committed offset 20000")
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
@ -2263,8 +2273,8 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.view SYNC;
DROP TABLE IF EXISTS test.consumer SYNC;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
@ -2309,8 +2319,8 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
print(result)
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
DROP TABLE test.consumer SYNC;
DROP TABLE test.view SYNC;
''')
# After https://github.com/edenhill/librdkafka/issues/2631
@ -2373,9 +2383,8 @@ def test_premature_flush_on_eof(kafka_cluster):
# all subscriptions/assignments done during select, so it start sending data to test.destination
# immediately after creation of MV
time.sleep(1.5) # that sleep is needed to ensure that first poll finished, and at least one 'empty' polls happened.
# Empty poll before the fix were leading to premature flush.
# TODO: wait for messages in log: "Polled batch of 1 messages", followed by "Stalled"
instance.wait_for_log_line("Polled batch of 1 messages")
instance.wait_for_log_line("Stalled")
# produce more messages after delay
kafka_produce('premature_flush_on_eof', messages)
@ -2383,7 +2392,7 @@ def test_premature_flush_on_eof(kafka_cluster):
# data was not flushed yet (it will be flushed 7.5 sec after creating MV)
assert int(instance.query("SELECT count() FROM test.destination")) == 0
time.sleep(9) # TODO: wait for messages in log: "Committed offset ..."
instance.wait_for_log_line("Committed offset 2")
# it should be single part, i.e. single insert
result = instance.query('SELECT _part, count() FROM test.destination group by _part')
@ -2395,10 +2404,10 @@ def test_premature_flush_on_eof(kafka_cluster):
''')
@pytest.mark.timeout(180)
@pytest.mark.timeout(120)
def test_kafka_unavailable(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(20000)]
kafka_produce('test_bad_reschedule', messages)
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(2000)]
kafka_produce('test_kafka_unavailable', messages)
kafka_cluster.pause_container('kafka1')
@ -2406,8 +2415,8 @@ def test_kafka_unavailable(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_topic_list = 'test_kafka_unavailable',
kafka_group_name = 'test_kafka_unavailable',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000,
kafka_flush_interval_ms = 1000;
@ -2426,19 +2435,21 @@ def test_kafka_unavailable(kafka_cluster):
''')
instance.query("SELECT * FROM test.kafka")
instance.query("SELECT count() FROM test.destination")
# enough to trigger issue
time.sleep(30)
instance.wait_for_log_line('brokers are down')
instance.wait_for_log_line('stalled. Reschedule', repetitions=2)
kafka_cluster.unpause_container('kafka1')
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
print("Waiting for consume")
time.sleep(1)
instance.wait_for_log_line("Committed offset 2000")
assert int(instance.query("SELECT count() FROM test.destination")) == 2000
@pytest.mark.timeout(180)
def test_kafka_issue14202(kafka_cluster):
"""
INSERT INTO Kafka Engine from an empty SELECT sub query was leading to failure
"""
instance.query('''
CREATE TABLE test.empty_table (
dt Date,
@ -2456,8 +2467,6 @@ def test_kafka_issue14202(kafka_cluster):
kafka_format = 'JSONEachRow';
''')
time.sleep(3)
instance.query(
'INSERT INTO test.kafka_q SELECT t, some_string FROM ( SELECT dt AS t, some_string FROM test.empty_table )')
# check instance is alive