Merge pull request #25111 from ClickHouse/less_timeouts

Less timeouts in integration tests
This commit is contained in:
alesapin 2021-06-10 19:09:27 +03:00 committed by GitHub
commit 9dd74f8b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 0 additions and 46 deletions

View File

@ -231,7 +231,6 @@ def kafka_setup_teardown():
# Tests # Tests
@pytest.mark.timeout(180)
def test_kafka_settings_old_syntax(kafka_cluster): def test_kafka_settings_old_syntax(kafka_cluster):
assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro", assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro",
ignore_error=True)) == TSV('''kafka_broker kafka1 ignore_error=True)) == TSV('''kafka_broker kafka1
@ -268,7 +267,6 @@ kafka_topic_old old
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose")) # text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose"))
@pytest.mark.timeout(180)
def test_kafka_settings_new_syntax(kafka_cluster): def test_kafka_settings_new_syntax(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -308,7 +306,6 @@ def test_kafka_settings_new_syntax(kafka_cluster):
assert members[0]['client_id'] == 'instance test 1234' assert members[0]['client_id'] == 'instance test 1234'
@pytest.mark.timeout(180)
def test_kafka_json_as_string(kafka_cluster): def test_kafka_json_as_string(kafka_cluster):
kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}']) '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
@ -334,7 +331,6 @@ def test_kafka_json_as_string(kafka_cluster):
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: [0-9]*) return no rows") "Parsing of message (topic: kafka_json_as_string, partition: 0, offset: [0-9]*) return no rows")
@pytest.mark.timeout(120)
def test_kafka_formats(kafka_cluster): def test_kafka_formats(kafka_cluster):
schema_registry_client = CachedSchemaRegistryClient('http://localhost:{}'.format(kafka_cluster.schema_registry_port)) schema_registry_client = CachedSchemaRegistryClient('http://localhost:{}'.format(kafka_cluster.schema_registry_port))
@ -739,7 +735,6 @@ def kafka_setup_teardown():
# Tests # Tests
@pytest.mark.timeout(180)
def test_kafka_settings_old_syntax(kafka_cluster): def test_kafka_settings_old_syntax(kafka_cluster):
assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro", assert TSV(instance.query("SELECT * FROM system.macros WHERE macro like 'kafka%' ORDER BY macro",
ignore_error=True)) == TSV('''kafka_broker kafka1 ignore_error=True)) == TSV('''kafka_broker kafka1
@ -776,7 +771,6 @@ kafka_topic_old old
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:{} --describe --members --group old --verbose".format(cluster.kafka_port))) # text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:{} --describe --members --group old --verbose".format(cluster.kafka_port)))
@pytest.mark.timeout(180)
def test_kafka_settings_new_syntax(kafka_cluster): def test_kafka_settings_new_syntax(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -816,7 +810,6 @@ def test_kafka_settings_new_syntax(kafka_cluster):
assert members[0]['client_id'] == 'instance test 1234' assert members[0]['client_id'] == 'instance test 1234'
@pytest.mark.timeout(180)
def test_kafka_issue11308(kafka_cluster): def test_kafka_issue11308(kafka_cluster):
# Check that matview does respect Kafka SETTINGS # Check that matview does respect Kafka SETTINGS
kafka_produce(kafka_cluster, 'issue11308', ['{"t": 123, "e": {"x": "woof"} }', '{"t": 123, "e": {"x": "woof"} }', kafka_produce(kafka_cluster, 'issue11308', ['{"t": 123, "e": {"x": "woof"} }', '{"t": 123, "e": {"x": "woof"} }',
@ -865,7 +858,6 @@ def test_kafka_issue11308(kafka_cluster):
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
@pytest.mark.timeout(180)
def test_kafka_issue4116(kafka_cluster): def test_kafka_issue4116(kafka_cluster):
# Check that format_csv_delimiter parameter works now - as part of all available format settings. # Check that format_csv_delimiter parameter works now - as part of all available format settings.
kafka_produce(kafka_cluster, 'issue4116', ['1|foo', '2|bar', '42|answer', '100|multi\n101|row\n103|message']) kafka_produce(kafka_cluster, 'issue4116', ['1|foo', '2|bar', '42|answer', '100|multi\n101|row\n103|message'])
@ -894,7 +886,6 @@ def test_kafka_issue4116(kafka_cluster):
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
@pytest.mark.timeout(180)
def test_kafka_consumer_hang(kafka_cluster): def test_kafka_consumer_hang(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
@ -946,7 +937,6 @@ def test_kafka_consumer_hang(kafka_cluster):
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
def test_kafka_consumer_hang2(kafka_cluster): def test_kafka_consumer_hang2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
@ -994,7 +984,6 @@ def test_kafka_consumer_hang2(kafka_cluster):
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(120)
def test_kafka_csv_with_delimiter(kafka_cluster): def test_kafka_csv_with_delimiter(kafka_cluster):
messages = [] messages = []
for i in range(50): for i in range(50):
@ -1019,7 +1008,6 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(120)
def test_kafka_tsv_with_delimiter(kafka_cluster): def test_kafka_tsv_with_delimiter(kafka_cluster):
messages = [] messages = []
for i in range(50): for i in range(50):
@ -1044,7 +1032,6 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(120)
def test_kafka_select_empty(kafka_cluster): def test_kafka_select_empty(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = [] topic_list = []
@ -1064,7 +1051,6 @@ def test_kafka_select_empty(kafka_cluster):
assert int(instance.query('SELECT count() FROM test.kafka')) == 0 assert int(instance.query('SELECT count() FROM test.kafka')) == 0
@pytest.mark.timeout(180)
def test_kafka_json_without_delimiter(kafka_cluster): def test_kafka_json_without_delimiter(kafka_cluster):
messages = '' messages = ''
for i in range(25): for i in range(25):
@ -1094,7 +1080,6 @@ def test_kafka_json_without_delimiter(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_protobuf(kafka_cluster): def test_kafka_protobuf(kafka_cluster):
kafka_produce_protobuf_messages(kafka_cluster, 'pb', 0, 20) kafka_produce_protobuf_messages(kafka_cluster, 'pb', 0, 20)
kafka_produce_protobuf_messages(kafka_cluster, 'pb', 20, 1) kafka_produce_protobuf_messages(kafka_cluster, 'pb', 20, 1)
@ -1119,7 +1104,6 @@ def test_kafka_protobuf(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_string_field_on_first_position_in_protobuf(kafka_cluster): def test_kafka_string_field_on_first_position_in_protobuf(kafka_cluster):
# https://github.com/ClickHouse/ClickHouse/issues/12615 # https://github.com/ClickHouse/ClickHouse/issues/12615
kafka_produce_protobuf_social(kafka_cluster, 'string_field_on_first_position_in_protobuf', 0, 20) kafka_produce_protobuf_social(kafka_cluster, 'string_field_on_first_position_in_protobuf', 0, 20)
@ -1194,7 +1178,6 @@ John Doe 49 1000049
''' '''
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
@pytest.mark.timeout(30)
def test_kafka_protobuf_no_delimiter(kafka_cluster): def test_kafka_protobuf_no_delimiter(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value String) CREATE TABLE test.kafka (key UInt64, value String)
@ -1243,7 +1226,6 @@ def test_kafka_protobuf_no_delimiter(kafka_cluster):
@pytest.mark.timeout(180)
def test_kafka_materialized_view(kafka_cluster): def test_kafka_materialized_view(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.view;
@ -1280,7 +1262,6 @@ def test_kafka_materialized_view(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_recreate_kafka_table(kafka_cluster): def test_kafka_recreate_kafka_table(kafka_cluster):
''' '''
Checks that materialized view work properly after dropping and recreating the Kafka table. Checks that materialized view work properly after dropping and recreating the Kafka table.
@ -1349,7 +1330,6 @@ def test_kafka_recreate_kafka_table(kafka_cluster):
''') ''')
@pytest.mark.timeout(180)
def test_librdkafka_compression(kafka_cluster): def test_librdkafka_compression(kafka_cluster):
""" """
Regression for UB in snappy-c (that is used in librdkafka), Regression for UB in snappy-c (that is used in librdkafka),
@ -1419,7 +1399,6 @@ def test_librdkafka_compression(kafka_cluster):
instance.query('DROP TABLE test.kafka SYNC') instance.query('DROP TABLE test.kafka SYNC')
instance.query('DROP TABLE test.consumer SYNC') instance.query('DROP TABLE test.consumer SYNC')
@pytest.mark.timeout(180)
def test_kafka_materialized_view_with_subquery(kafka_cluster): def test_kafka_materialized_view_with_subquery(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.view;
@ -1456,7 +1435,6 @@ def test_kafka_materialized_view_with_subquery(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_many_materialized_views(kafka_cluster): def test_kafka_many_materialized_views(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view1;
@ -1504,7 +1482,6 @@ def test_kafka_many_materialized_views(kafka_cluster):
kafka_check_result(result2, True) kafka_check_result(result2, True)
@pytest.mark.timeout(300)
def test_kafka_flush_on_big_message(kafka_cluster): def test_kafka_flush_on_big_message(kafka_cluster):
# Create batchs of messages of size ~100Kb # Create batchs of messages of size ~100Kb
kafka_messages = 1000 kafka_messages = 1000
@ -1554,7 +1531,6 @@ def test_kafka_flush_on_big_message(kafka_cluster):
assert int(result) == kafka_messages * batch_messages, 'ClickHouse lost some messages: {}'.format(result) assert int(result) == kafka_messages * batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(180)
def test_kafka_virtual_columns(kafka_cluster): def test_kafka_virtual_columns(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -1586,7 +1562,6 @@ def test_kafka_virtual_columns(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual1.reference') kafka_check_result(result, True, 'test_kafka_virtual1.reference')
@pytest.mark.timeout(180)
def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.view;
@ -1623,7 +1598,6 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual2.reference') kafka_check_result(result, True, 'test_kafka_virtual2.reference')
@pytest.mark.timeout(180)
def test_kafka_insert(kafka_cluster): def test_kafka_insert(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -1660,7 +1634,6 @@ def test_kafka_insert(kafka_cluster):
kafka_check_result(result, True) kafka_check_result(result, True)
@pytest.mark.timeout(240)
def test_kafka_produce_consume(kafka_cluster): def test_kafka_produce_consume(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.view;
@ -1722,7 +1695,6 @@ def test_kafka_produce_consume(kafka_cluster):
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(300)
def test_kafka_commit_on_block_write(kafka_cluster): def test_kafka_commit_on_block_write(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.view;
@ -1792,7 +1764,6 @@ def test_kafka_commit_on_block_write(kafka_cluster):
assert result == 1, 'Messages from kafka get duplicated!' assert result == 1, 'Messages from kafka get duplicated!'
@pytest.mark.timeout(180)
def test_kafka_virtual_columns2(kafka_cluster): def test_kafka_virtual_columns2(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = [] topic_list = []
@ -1859,7 +1830,6 @@ def test_kafka_virtual_columns2(kafka_cluster):
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
@pytest.mark.timeout(120)
def test_kafka_produce_key_timestamp(kafka_cluster): def test_kafka_produce_key_timestamp(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
@ -1919,7 +1889,6 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
@pytest.mark.timeout(600)
def test_kafka_flush_by_time(kafka_cluster): def test_kafka_flush_by_time(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = [] topic_list = []
@ -1980,7 +1949,6 @@ def test_kafka_flush_by_time(kafka_cluster):
assert TSV(result) == TSV('1 1') assert TSV(result) == TSV('1 1')
@pytest.mark.timeout(90)
def test_kafka_flush_by_block_size(kafka_cluster): def test_kafka_flush_by_block_size(kafka_cluster):
cancel = threading.Event() cancel = threading.Event()
@ -2039,7 +2007,6 @@ def test_kafka_flush_by_block_size(kafka_cluster):
result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!' result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!'
@pytest.mark.timeout(600)
def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster): def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
@ -2087,7 +2054,6 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
''') ''')
@pytest.mark.timeout(1200)
def test_kafka_rebalance(kafka_cluster): def test_kafka_rebalance(kafka_cluster):
NUMBER_OF_CONSURRENT_CONSUMERS = 11 NUMBER_OF_CONSURRENT_CONSUMERS = 11
@ -2220,7 +2186,6 @@ def test_kafka_rebalance(kafka_cluster):
assert result == 1, 'Messages from kafka get duplicated!' assert result == 1, 'Messages from kafka get duplicated!'
@pytest.mark.timeout(120)
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)] messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce(kafka_cluster, 'no_holes_when_write_suffix_failed', messages) kafka_produce(kafka_cluster, 'no_holes_when_write_suffix_failed', messages)
@ -2274,7 +2239,6 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
assert TSV(result) == TSV('22\t22\t22') assert TSV(result) == TSV('22\t22\t22')
@pytest.mark.timeout(120)
def test_exception_from_destructor(kafka_cluster): def test_exception_from_destructor(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value String) CREATE TABLE test.kafka (key UInt64, value String)
@ -2307,7 +2271,6 @@ def test_exception_from_destructor(kafka_cluster):
assert TSV(instance.query('SELECT 1')) == TSV('1') assert TSV(instance.query('SELECT 1')) == TSV('1')
@pytest.mark.timeout(120)
def test_commits_of_unprocessed_messages_on_drop(kafka_cluster): def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(1)] messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(1)]
kafka_produce(kafka_cluster, 'commits_of_unprocessed_messages_on_drop', messages) kafka_produce(kafka_cluster, 'commits_of_unprocessed_messages_on_drop', messages)
@ -2401,7 +2364,6 @@ def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0] - 1)), 'Missing data!' assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0] - 1)), 'Missing data!'
@pytest.mark.timeout(300)
def test_bad_reschedule(kafka_cluster): def test_bad_reschedule(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(20000)] messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(20000)]
kafka_produce(kafka_cluster, 'test_bad_reschedule', messages) kafka_produce(kafka_cluster, 'test_bad_reschedule', messages)
@ -2434,7 +2396,6 @@ def test_bad_reschedule(kafka_cluster):
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8 assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
@pytest.mark.timeout(300)
def test_kafka_duplicates_when_commit_failed(kafka_cluster): def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)] messages = [json.dumps({'key': j + 1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce(kafka_cluster, 'duplicates_when_commit_failed', messages) kafka_produce(kafka_cluster, 'duplicates_when_commit_failed', messages)
@ -2502,7 +2463,6 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
# But in cases of some peaky loads in kafka topic the current contract sounds more predictable and # But in cases of some peaky loads in kafka topic the current contract sounds more predictable and
# easier to understand, so let's keep it as is for now. # easier to understand, so let's keep it as is for now.
# also we can came to eof because we drained librdkafka internal queue too fast # also we can came to eof because we drained librdkafka internal queue too fast
@pytest.mark.timeout(120)
def test_premature_flush_on_eof(kafka_cluster): def test_premature_flush_on_eof(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -2571,7 +2531,6 @@ def test_premature_flush_on_eof(kafka_cluster):
''') ''')
@pytest.mark.timeout(180)
def test_kafka_unavailable(kafka_cluster): def test_kafka_unavailable(kafka_cluster):
messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(20000)] messages = [json.dumps({'key': j + 1, 'value': j + 1}) for j in range(20000)]
kafka_produce(kafka_cluster, 'test_bad_reschedule', messages) kafka_produce(kafka_cluster, 'test_bad_reschedule', messages)
@ -2612,7 +2571,6 @@ def test_kafka_unavailable(kafka_cluster):
time.sleep(1) time.sleep(1)
@pytest.mark.timeout(180)
def test_kafka_issue14202(kafka_cluster): def test_kafka_issue14202(kafka_cluster):
""" """
INSERT INTO Kafka Engine from an empty SELECT sub query was leading to failure INSERT INTO Kafka Engine from an empty SELECT sub query was leading to failure
@ -2645,7 +2603,6 @@ def test_kafka_issue14202(kafka_cluster):
''') ''')
@pytest.mark.timeout(180)
def test_kafka_csv_with_thread_per_consumer(kafka_cluster): def test_kafka_csv_with_thread_per_consumer(kafka_cluster):
instance.query(''' instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64) CREATE TABLE test.kafka (key UInt64, value UInt64)
@ -2675,7 +2632,6 @@ def test_kafka_csv_with_thread_per_consumer(kafka_cluster):
def random_string(size=8): def random_string(size=8):
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size)) return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size))
@pytest.mark.timeout(180)
def test_kafka_engine_put_errors_to_stream(kafka_cluster): def test_kafka_engine_put_errors_to_stream(kafka_cluster):
instance.query(''' instance.query('''
DROP TABLE IF EXISTS test.kafka; DROP TABLE IF EXISTS test.kafka;
@ -2800,7 +2756,6 @@ def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_clus
DROP TABLE test.kafka_errors; DROP TABLE test.kafka_errors;
''') ''')
@pytest.mark.timeout(120)
def test_kafka_formats_with_broken_message(kafka_cluster): def test_kafka_formats_with_broken_message(kafka_cluster):
# data was dumped from clickhouse itself in a following manner # data was dumped from clickhouse itself in a following manner
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g' # clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'

View File

@ -68,7 +68,6 @@ def kafka_setup_teardown():
# Tests # Tests
@pytest.mark.timeout(180) # wait to build containers
def test_kafka_json_as_string(kafka_cluster): def test_kafka_json_as_string(kafka_cluster):
kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}']) kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])