Speed up more tests

This commit is contained in:
Antonio Andelic 2024-09-06 17:07:21 +02:00
parent 06507190d4
commit 86f11e221e

View File

@ -1000,7 +1000,11 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
} }
topic_postfix = str(hash(create_query_generator)) topic_postfix = str(hash(create_query_generator))
for format_name, format_opts in list(all_formats.items()):
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
logging.debug(f"Set up {format_name}") logging.debug(f"Set up {format_name}")
topic_name = f"format_tests_{format_name}-{topic_postfix}" topic_name = f"format_tests_{format_name}-{topic_postfix}"
data_sample = format_opts["data_sample"] data_sample = format_opts["data_sample"]
@ -1036,6 +1040,13 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
), ),
) )
) )
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\ raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0} 0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1} 1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -1066,7 +1077,9 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
) )
assert result_checker(res) assert result_checker(res)
for format_name, format_opts in list(all_formats.items()): results = []
def run_for_format2(format_name, format_opts):
logging.debug(("Checking {}".format(format_name))) logging.debug(("Checking {}".format(format_name)))
topic_name = f"format_tests_{format_name}-{topic_postfix}" topic_name = f"format_tests_{format_name}-{topic_postfix}"
# shift offsets by 1 if format supports empty value # shift offsets by 1 if format supports empty value
@ -1090,6 +1103,12 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
) )
kafka_delete_topic(get_admin_client(kafka_cluster), topic_name) kafka_delete_topic(get_admin_client(kafka_cluster), topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
# Since everything is async and shaky when receiving messages from Kafka, # Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop. # we may want to try and check results multiple times in a loop.
@ -4239,7 +4258,11 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
topic_name_prefix = "format_tests_4_stream_" topic_name_prefix = "format_tests_4_stream_"
topic_name_postfix = get_topic_postfix(create_query_generator) topic_name_postfix = get_topic_postfix(create_query_generator)
for format_name, format_opts in list(all_formats.items()):
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
logging.debug(f"Set up {format_name}") logging.debug(f"Set up {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}" topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
data_sample = format_opts["data_sample"] data_sample = format_opts["data_sample"]
@ -4280,6 +4303,12 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
""" """
) )
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\ raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0} 0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1} 1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -4310,7 +4339,9 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
) )
assert result_checker(res) assert result_checker(res)
for format_name, format_opts in list(all_formats.items()): results = []
def run_for_format2(format_name, format_opts):
logging.debug(f"Checking {format_name}") logging.debug(f"Checking {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}" topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
# shift offsets by 1 if format supports empty value # shift offsets by 1 if format supports empty value
@ -4350,6 +4381,12 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
), "Proper error for format: {}".format(format_name) ), "Proper error for format: {}".format(format_name)
kafka_delete_topic(admin_client, topic_name) kafka_delete_topic(admin_client, topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
@pytest.mark.parametrize( @pytest.mark.parametrize(
"create_query_generator", "create_query_generator",