diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 440b7c526b9..bef90e1b9d3 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -10,8 +10,6 @@ import string import ast import math -from multiprocessing.dummy import Pool - import avro.schema import avro.io import avro.datafile @@ -1000,11 +998,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator): } topic_postfix = str(hash(create_query_generator)) - - p = Pool(10) - results = [] - - def run_for_format(format_name, format_opts): + for format_name, format_opts in list(all_formats.items()): logging.debug(f"Set up {format_name}") topic_name = f"format_tests_{format_name}-{topic_postfix}" data_sample = format_opts["data_sample"] @@ -1040,13 +1034,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator): ), ) ) - - for format_name, format_opts in list(all_formats.items()): - results.append(p.apply_async(run_for_format, args=(format_name, format_opts))) - - for result in results: - result.get() - raw_expected = """\ 0 0 AM 0.5 1 {topic_name} 0 {offset_0} 1 0 AM 0.5 1 {topic_name} 0 {offset_1} @@ -1077,9 +1064,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator): ) assert result_checker(res) - results = [] - - def run_for_format2(format_name, format_opts): + for format_name, format_opts in list(all_formats.items()): logging.debug(("Checking {}".format(format_name))) topic_name = f"format_tests_{format_name}-{topic_postfix}" # shift offsets by 1 if format supports empty value @@ -1103,12 +1088,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator): ) kafka_delete_topic(get_admin_client(kafka_cluster), topic_name) - for format_name, format_opts in list(all_formats.items()): - results.append(p.apply_async(run_for_format2, args=(format_name, format_opts))) - - for result in results: - result.get() - # Since everything is async and shaky when receiving messages from Kafka, # we may want to try and check results multiple times in a loop. @@ -4258,11 +4237,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator topic_name_prefix = "format_tests_4_stream_" topic_name_postfix = get_topic_postfix(create_query_generator) - - p = Pool(10) - results = [] - - def run_for_format(format_name, format_opts): + for format_name, format_opts in list(all_formats.items()): logging.debug(f"Set up {format_name}") topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}" data_sample = format_opts["data_sample"] @@ -4303,12 +4278,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator """ ) - for format_name, format_opts in list(all_formats.items()): - results.append(p.apply_async(run_for_format, args=(format_name, format_opts))) - - for result in results: - result.get() - raw_expected = """\ 0 0 AM 0.5 1 {topic_name} 0 {offset_0} 1 0 AM 0.5 1 {topic_name} 0 {offset_1} @@ -4339,9 +4308,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator ) assert result_checker(res) - results = [] - - def run_for_format2(format_name, format_opts): + for format_name, format_opts in list(all_formats.items()): logging.debug(f"Checking {format_name}") topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}" # shift offsets by 1 if format supports empty value @@ -4381,12 +4348,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator ), "Proper error for format: {}".format(format_name) kafka_delete_topic(admin_client, topic_name) - for format_name, format_opts in list(all_formats.items()): - results.append(p.apply_async(run_for_format2, args=(format_name, format_opts))) - - for result in results: - result.get() - @pytest.mark.parametrize( "create_query_generator", @@ -4870,63 +4831,6 @@ def test_max_rows_per_message(kafka_cluster, create_query_generator): def test_row_based_formats(kafka_cluster, create_query_generator): admin_client = get_admin_client(kafka_cluster) - p = Pool(10) - - def run_for_format_name(format_name): - logging.debug("Checking {format_name}") - - topic_name = format_name + get_topic_postfix(create_query_generator) - view_name = f"kafka_view_{format_name}" - table_name = f"kafka_{format_name}" - - with kafka_topic(admin_client, topic_name): - num_rows = 10 - max_rows_per_message = 5 - message_count = num_rows / max_rows_per_message - - create_query = create_query_generator( - table_name, - "key UInt64, value UInt64", - topic_list=topic_name, - consumer_group=topic_name, - format=format_name, - settings={"kafka_max_rows_per_message": max_rows_per_message}, - ) - - instance.query( - f""" - DROP TABLE IF EXISTS test.{view_name}; - DROP TABLE IF EXISTS test.{table_name}; - - {create_query}; - - CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS - SELECT key, value FROM test.{table_name}; - - INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}); - """ - ) - - messages = kafka_consume_with_retry( - kafka_cluster, topic_name, message_count, need_decode=False - ) - - assert ( - len(messages) == message_count - ), f"Invalid message count for {format_name}" - - instance.query_with_retry( - f"SELECT count() FROM test.{view_name}", - check_callback=lambda res: int(res) == num_rows, - ) - - result = instance.query(f"SELECT * FROM test.{view_name}") - expected = "" - for i in range(num_rows): - expected += str(i * 10) + "\t" + str(i * 100) + "\n" - assert result == expected, f"Invalid result for {format_name}" - - results = [] for format_name in [ "TSV", "TSVWithNamesAndTypes", @@ -4945,10 +4849,55 @@ def test_row_based_formats(kafka_cluster, create_query_generator): "RowBinaryWithNamesAndTypes", "MsgPack", ]: - results.append(p.apply_async(run_for_format_name, args=(format_name,))) + logging.debug("Checking {format_name}") - for result in results: - result.get() + topic_name = format_name + get_topic_postfix(create_query_generator) + table_name = f"kafka_{format_name}" + + with kafka_topic(admin_client, topic_name): + num_rows = 10 + max_rows_per_message = 5 + message_count = num_rows / max_rows_per_message + + create_query = create_query_generator( + table_name, + "key UInt64, value UInt64", + topic_list=topic_name, + consumer_group=topic_name, + format=format_name, + settings={"kafka_max_rows_per_message": max_rows_per_message}, + ) + + instance.query( + f""" + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.{table_name}; + + {create_query}; + + CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS + SELECT key, value FROM test.{table_name}; + + INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}); + """ + ) + + messages = kafka_consume_with_retry( + kafka_cluster, topic_name, message_count, need_decode=False + ) + + assert len(messages) == message_count + + instance.query_with_retry( + "SELECT count() FROM test.view", + check_callback=lambda res: int(res) == num_rows, + ) + + result = instance.query("SELECT * FROM test.view") + expected = "" + for i in range(num_rows): + expected += str(i * 10) + "\t" + str(i * 100) + "\n" + assert result == expected @pytest.mark.parametrize( @@ -5006,12 +4955,16 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator): num_rows = 100 message_count = 9 - p = Pool(10) - - def run_for_format_name(format_name): + for format_name in [ + "JSONColumns", + "Native", + "Arrow", + "Parquet", + "ORC", + "JSONCompactColumns", + ]: topic_name = format_name + get_topic_postfix(create_query_generator) table_name = f"kafka_{format_name}" - view_name = f"kafka_view_{format_name}" logging.debug(f"Checking format {format_name}") with kafka_topic(admin_client, topic_name): create_query = create_query_generator( @@ -5024,12 +4977,12 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator): instance.query( f""" - DROP TABLE IF EXISTS test.{view_name}; + DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.{table_name}; {create_query}; - CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS + CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS SELECT key, value FROM test.{table_name}; INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0; @@ -5038,38 +4991,22 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator): messages = kafka_consume_with_retry( kafka_cluster, topic_name, message_count, need_decode=False ) - assert ( - len(messages) == message_count - ), f"Invalid message count for {format_name}" + assert len(messages) == message_count rows = int( instance.query_with_retry( - f"SELECT count() FROM test.{view_name}", + "SELECT count() FROM test.view", check_callback=lambda res: int(res) == num_rows, ) ) - assert rows == num_rows, f"Invalid row count for {format_name}" + assert rows == num_rows - result = instance.query(f"SELECT * FROM test.{view_name} ORDER by key") + result = instance.query("SELECT * FROM test.view ORDER by key") expected = "" for i in range(num_rows): expected += str(i * 10) + "\t" + str(i * 100) + "\n" - assert result == expected, f"Invalid result for {format_name}" - - results = [] - for format_name in [ - "JSONColumns", - "Native", - "Arrow", - "Parquet", - "ORC", - "JSONCompactColumns", - ]: - results.append(p.apply_async(run_for_format_name, args=(format_name,))) - - for result in results: - result.get() + assert result == expected def test_system_kafka_consumers(kafka_cluster): @@ -5363,54 +5300,6 @@ def test_formats_errors(kafka_cluster): bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) ) - p = Pool(10) - - def run_for_format_name(format_name): - with kafka_topic(admin_client, format_name): - table_name = f"kafka_{format_name}" - view_name = f"kafka_view_{format_name}" - - instance.query( - f""" - DROP TABLE IF EXISTS test.{view_name}; - DROP TABLE IF EXISTS test.{table_name}; - - CREATE TABLE test.{table_name} (key UInt64, value UInt64) - ENGINE = Kafka - SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}', - kafka_group_name = '{format_name}', - kafka_format = '{format_name}', - kafka_max_rows_per_message = 5, - format_template_row='template_row.format', - format_regexp='id: (.+?)', - input_format_with_names_use_header=0, - format_schema='key_value_message:Message'; - - CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS - SELECT key, value FROM test.{table_name}; - """ - ) - - kafka_produce( - kafka_cluster, - format_name, - ["Broken message\nBroken message\nBroken message\n"], - ) - - num_errors = int( - instance.query_with_retry( - f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'", - check_callback=lambda res: int(res) > 0, - ) - ) - - assert num_errors > 0, f"No errors for {format_name}" - - instance.query(f"DROP TABLE test.{table_name}") - instance.query(f"DROP TABLE test.{view_name}") - - results = [] for format_name in [ "Template", "Regexp", @@ -5453,10 +5342,48 @@ def test_formats_errors(kafka_cluster): "HiveText", "MySQLDump", ]: - results.append(p.apply_async(run_for_format_name, args=(format_name,))) + with kafka_topic(admin_client, format_name): + table_name = f"kafka_{format_name}" - for result in results: - result.get() + instance.query( + f""" + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.{table_name}; + + CREATE TABLE test.{table_name} (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = '{format_name}', + kafka_group_name = '{format_name}', + kafka_format = '{format_name}', + kafka_max_rows_per_message = 5, + format_template_row='template_row.format', + format_regexp='id: (.+?)', + input_format_with_names_use_header=0, + format_schema='key_value_message:Message'; + + CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS + SELECT key, value FROM test.{table_name}; + """ + ) + + kafka_produce( + kafka_cluster, + format_name, + ["Broken message\nBroken message\nBroken message\n"], + ) + + num_errors = int( + instance.query_with_retry( + f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'", + check_callback=lambda res: int(res) > 0, + ) + ) + + assert num_errors > 0 + + instance.query(f"DROP TABLE test.{table_name}") + instance.query("DROP TABLE test.view") @pytest.mark.parametrize(