Merge pull request #69356 from ClickHouse/revert-69324-speed-up-slightly-kafka-tests

Revert "Speed up some Kafka tests with multiprocessing"
This commit is contained in:
Nikita Mikhaylov 2024-09-07 11:27:47 +00:00 committed by GitHub
commit b161ce0c4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -10,8 +10,6 @@ import string
import ast
import math
from multiprocessing.dummy import Pool
import avro.schema
import avro.io
import avro.datafile
@ -1000,11 +998,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
}
topic_postfix = str(hash(create_query_generator))
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Set up {format_name}")
topic_name = f"format_tests_{format_name}-{topic_postfix}"
data_sample = format_opts["data_sample"]
@ -1040,13 +1034,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
),
)
)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -1077,9 +1064,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
)
assert result_checker(res)
results = []
def run_for_format2(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(("Checking {}".format(format_name)))
topic_name = f"format_tests_{format_name}-{topic_postfix}"
# shift offsets by 1 if format supports empty value
@ -1103,12 +1088,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
)
kafka_delete_topic(get_admin_client(kafka_cluster), topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
@ -4258,11 +4237,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
topic_name_prefix = "format_tests_4_stream_"
topic_name_postfix = get_topic_postfix(create_query_generator)
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Set up {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
data_sample = format_opts["data_sample"]
@ -4303,12 +4278,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
"""
)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -4339,9 +4308,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
)
assert result_checker(res)
results = []
def run_for_format2(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Checking {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
# shift offsets by 1 if format supports empty value
@ -4381,12 +4348,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
), "Proper error for format: {}".format(format_name)
kafka_delete_topic(admin_client, topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
@pytest.mark.parametrize(
"create_query_generator",
@ -4870,63 +4831,6 @@ def test_max_rows_per_message(kafka_cluster, create_query_generator):
def test_row_based_formats(kafka_cluster, create_query_generator):
admin_client = get_admin_client(kafka_cluster)
p = Pool(10)
def run_for_format_name(format_name):
logging.debug("Checking {format_name}")
topic_name = format_name + get_topic_postfix(create_query_generator)
view_name = f"kafka_view_{format_name}"
table_name = f"kafka_{format_name}"
with kafka_topic(admin_client, topic_name):
num_rows = 10
max_rows_per_message = 5
message_count = num_rows / max_rows_per_message
create_query = create_query_generator(
table_name,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
format=format_name,
settings={"kafka_max_rows_per_message": max_rows_per_message},
)
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
"""
)
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert (
len(messages) == message_count
), f"Invalid message count for {format_name}"
instance.query_with_retry(
f"SELECT count() FROM test.{view_name}",
check_callback=lambda res: int(res) == num_rows,
)
result = instance.query(f"SELECT * FROM test.{view_name}")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected, f"Invalid result for {format_name}"
results = []
for format_name in [
"TSV",
"TSVWithNamesAndTypes",
@ -4945,10 +4849,55 @@ def test_row_based_formats(kafka_cluster, create_query_generator):
"RowBinaryWithNamesAndTypes",
"MsgPack",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
logging.debug("Checking {format_name}")
for result in results:
result.get()
topic_name = format_name + get_topic_postfix(create_query_generator)
table_name = f"kafka_{format_name}"
with kafka_topic(admin_client, topic_name):
num_rows = 10
max_rows_per_message = 5
message_count = num_rows / max_rows_per_message
create_query = create_query_generator(
table_name,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
format=format_name,
settings={"kafka_max_rows_per_message": max_rows_per_message},
)
instance.query(
f"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
"""
)
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert len(messages) == message_count
instance.query_with_retry(
"SELECT count() FROM test.view",
check_callback=lambda res: int(res) == num_rows,
)
result = instance.query("SELECT * FROM test.view")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected
@pytest.mark.parametrize(
@ -5006,12 +4955,16 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
num_rows = 100
message_count = 9
p = Pool(10)
def run_for_format_name(format_name):
for format_name in [
"JSONColumns",
"Native",
"Arrow",
"Parquet",
"ORC",
"JSONCompactColumns",
]:
topic_name = format_name + get_topic_postfix(create_query_generator)
table_name = f"kafka_{format_name}"
view_name = f"kafka_view_{format_name}"
logging.debug(f"Checking format {format_name}")
with kafka_topic(admin_client, topic_name):
create_query = create_query_generator(
@ -5024,12 +4977,12 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;
@ -5038,38 +4991,22 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert (
len(messages) == message_count
), f"Invalid message count for {format_name}"
assert len(messages) == message_count
rows = int(
instance.query_with_retry(
f"SELECT count() FROM test.{view_name}",
"SELECT count() FROM test.view",
check_callback=lambda res: int(res) == num_rows,
)
)
assert rows == num_rows, f"Invalid row count for {format_name}"
assert rows == num_rows
result = instance.query(f"SELECT * FROM test.{view_name} ORDER by key")
result = instance.query("SELECT * FROM test.view ORDER by key")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected, f"Invalid result for {format_name}"
results = []
for format_name in [
"JSONColumns",
"Native",
"Arrow",
"Parquet",
"ORC",
"JSONCompactColumns",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
for result in results:
result.get()
assert result == expected
def test_system_kafka_consumers(kafka_cluster):
@ -5363,54 +5300,6 @@ def test_formats_errors(kafka_cluster):
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
p = Pool(10)
def run_for_format_name(format_name):
with kafka_topic(admin_client, format_name):
table_name = f"kafka_{format_name}"
view_name = f"kafka_view_{format_name}"
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.{table_name};
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_max_rows_per_message = 5,
format_template_row='template_row.format',
format_regexp='id: (.+?)',
input_format_with_names_use_header=0,
format_schema='key_value_message:Message';
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
"""
)
kafka_produce(
kafka_cluster,
format_name,
["Broken message\nBroken message\nBroken message\n"],
)
num_errors = int(
instance.query_with_retry(
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
check_callback=lambda res: int(res) > 0,
)
)
assert num_errors > 0, f"No errors for {format_name}"
instance.query(f"DROP TABLE test.{table_name}")
instance.query(f"DROP TABLE test.{view_name}")
results = []
for format_name in [
"Template",
"Regexp",
@ -5453,10 +5342,48 @@ def test_formats_errors(kafka_cluster):
"HiveText",
"MySQLDump",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
with kafka_topic(admin_client, format_name):
table_name = f"kafka_{format_name}"
for result in results:
result.get()
instance.query(
f"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_max_rows_per_message = 5,
format_template_row='template_row.format',
format_regexp='id: (.+?)',
input_format_with_names_use_header=0,
format_schema='key_value_message:Message';
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
"""
)
kafka_produce(
kafka_cluster,
format_name,
["Broken message\nBroken message\nBroken message\n"],
)
num_errors = int(
instance.query_with_retry(
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
check_callback=lambda res: int(res) > 0,
)
)
assert num_errors > 0
instance.query(f"DROP TABLE test.{table_name}")
instance.query("DROP TABLE test.view")
@pytest.mark.parametrize(