mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Revert "Speed up some Kafka tests with multiprocessing"
This commit is contained in:
parent
cc165f9349
commit
9667c19b6e
@ -10,8 +10,6 @@ import string
|
||||
import ast
|
||||
import math
|
||||
|
||||
from multiprocessing.dummy import Pool
|
||||
|
||||
import avro.schema
|
||||
import avro.io
|
||||
import avro.datafile
|
||||
@ -1000,11 +998,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
|
||||
}
|
||||
|
||||
topic_postfix = str(hash(create_query_generator))
|
||||
|
||||
p = Pool(10)
|
||||
results = []
|
||||
|
||||
def run_for_format(format_name, format_opts):
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(f"Set up {format_name}")
|
||||
topic_name = f"format_tests_{format_name}-{topic_postfix}"
|
||||
data_sample = format_opts["data_sample"]
|
||||
@ -1040,13 +1034,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -1077,9 +1064,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
|
||||
)
|
||||
assert result_checker(res)
|
||||
|
||||
results = []
|
||||
|
||||
def run_for_format2(format_name, format_opts):
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(("Checking {}".format(format_name)))
|
||||
topic_name = f"format_tests_{format_name}-{topic_postfix}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
@ -1103,12 +1088,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
|
||||
)
|
||||
kafka_delete_topic(get_admin_client(kafka_cluster), topic_name)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
|
||||
|
||||
# Since everything is async and shaky when receiving messages from Kafka,
|
||||
# we may want to try and check results multiple times in a loop.
|
||||
@ -4258,11 +4237,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
|
||||
|
||||
topic_name_prefix = "format_tests_4_stream_"
|
||||
topic_name_postfix = get_topic_postfix(create_query_generator)
|
||||
|
||||
p = Pool(10)
|
||||
results = []
|
||||
|
||||
def run_for_format(format_name, format_opts):
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(f"Set up {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
|
||||
data_sample = format_opts["data_sample"]
|
||||
@ -4303,12 +4278,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
|
||||
"""
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -4339,9 +4308,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
|
||||
)
|
||||
assert result_checker(res)
|
||||
|
||||
results = []
|
||||
|
||||
def run_for_format2(format_name, format_opts):
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(f"Checking {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
@ -4381,12 +4348,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
|
||||
), "Proper error for format: {}".format(format_name)
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"create_query_generator",
|
||||
@ -4870,63 +4831,6 @@ def test_max_rows_per_message(kafka_cluster, create_query_generator):
|
||||
def test_row_based_formats(kafka_cluster, create_query_generator):
|
||||
admin_client = get_admin_client(kafka_cluster)
|
||||
|
||||
p = Pool(10)
|
||||
|
||||
def run_for_format_name(format_name):
|
||||
logging.debug("Checking {format_name}")
|
||||
|
||||
topic_name = format_name + get_topic_postfix(create_query_generator)
|
||||
view_name = f"kafka_view_{format_name}"
|
||||
table_name = f"kafka_{format_name}"
|
||||
|
||||
with kafka_topic(admin_client, topic_name):
|
||||
num_rows = 10
|
||||
max_rows_per_message = 5
|
||||
message_count = num_rows / max_rows_per_message
|
||||
|
||||
create_query = create_query_generator(
|
||||
table_name,
|
||||
"key UInt64, value UInt64",
|
||||
topic_list=topic_name,
|
||||
consumer_group=topic_name,
|
||||
format=format_name,
|
||||
settings={"kafka_max_rows_per_message": max_rows_per_message},
|
||||
)
|
||||
|
||||
instance.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.{view_name};
|
||||
DROP TABLE IF EXISTS test.{table_name};
|
||||
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
|
||||
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
|
||||
"""
|
||||
)
|
||||
|
||||
messages = kafka_consume_with_retry(
|
||||
kafka_cluster, topic_name, message_count, need_decode=False
|
||||
)
|
||||
|
||||
assert (
|
||||
len(messages) == message_count
|
||||
), f"Invalid message count for {format_name}"
|
||||
|
||||
instance.query_with_retry(
|
||||
f"SELECT count() FROM test.{view_name}",
|
||||
check_callback=lambda res: int(res) == num_rows,
|
||||
)
|
||||
|
||||
result = instance.query(f"SELECT * FROM test.{view_name}")
|
||||
expected = ""
|
||||
for i in range(num_rows):
|
||||
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
||||
assert result == expected, f"Invalid result for {format_name}"
|
||||
|
||||
results = []
|
||||
for format_name in [
|
||||
"TSV",
|
||||
"TSVWithNamesAndTypes",
|
||||
@ -4945,10 +4849,55 @@ def test_row_based_formats(kafka_cluster, create_query_generator):
|
||||
"RowBinaryWithNamesAndTypes",
|
||||
"MsgPack",
|
||||
]:
|
||||
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
|
||||
logging.debug("Checking {format_name}")
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
topic_name = format_name + get_topic_postfix(create_query_generator)
|
||||
table_name = f"kafka_{format_name}"
|
||||
|
||||
with kafka_topic(admin_client, topic_name):
|
||||
num_rows = 10
|
||||
max_rows_per_message = 5
|
||||
message_count = num_rows / max_rows_per_message
|
||||
|
||||
create_query = create_query_generator(
|
||||
table_name,
|
||||
"key UInt64, value UInt64",
|
||||
topic_list=topic_name,
|
||||
consumer_group=topic_name,
|
||||
format=format_name,
|
||||
settings={"kafka_max_rows_per_message": max_rows_per_message},
|
||||
)
|
||||
|
||||
instance.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.{table_name};
|
||||
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
|
||||
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
|
||||
"""
|
||||
)
|
||||
|
||||
messages = kafka_consume_with_retry(
|
||||
kafka_cluster, topic_name, message_count, need_decode=False
|
||||
)
|
||||
|
||||
assert len(messages) == message_count
|
||||
|
||||
instance.query_with_retry(
|
||||
"SELECT count() FROM test.view",
|
||||
check_callback=lambda res: int(res) == num_rows,
|
||||
)
|
||||
|
||||
result = instance.query("SELECT * FROM test.view")
|
||||
expected = ""
|
||||
for i in range(num_rows):
|
||||
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -5006,12 +4955,16 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
|
||||
num_rows = 100
|
||||
message_count = 9
|
||||
|
||||
p = Pool(10)
|
||||
|
||||
def run_for_format_name(format_name):
|
||||
for format_name in [
|
||||
"JSONColumns",
|
||||
"Native",
|
||||
"Arrow",
|
||||
"Parquet",
|
||||
"ORC",
|
||||
"JSONCompactColumns",
|
||||
]:
|
||||
topic_name = format_name + get_topic_postfix(create_query_generator)
|
||||
table_name = f"kafka_{format_name}"
|
||||
view_name = f"kafka_view_{format_name}"
|
||||
logging.debug(f"Checking format {format_name}")
|
||||
with kafka_topic(admin_client, topic_name):
|
||||
create_query = create_query_generator(
|
||||
@ -5024,12 +4977,12 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
|
||||
|
||||
instance.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.{view_name};
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.{table_name};
|
||||
|
||||
{create_query};
|
||||
|
||||
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
|
||||
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;
|
||||
@ -5038,38 +4991,22 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
|
||||
messages = kafka_consume_with_retry(
|
||||
kafka_cluster, topic_name, message_count, need_decode=False
|
||||
)
|
||||
assert (
|
||||
len(messages) == message_count
|
||||
), f"Invalid message count for {format_name}"
|
||||
assert len(messages) == message_count
|
||||
|
||||
rows = int(
|
||||
instance.query_with_retry(
|
||||
f"SELECT count() FROM test.{view_name}",
|
||||
"SELECT count() FROM test.view",
|
||||
check_callback=lambda res: int(res) == num_rows,
|
||||
)
|
||||
)
|
||||
|
||||
assert rows == num_rows, f"Invalid row count for {format_name}"
|
||||
assert rows == num_rows
|
||||
|
||||
result = instance.query(f"SELECT * FROM test.{view_name} ORDER by key")
|
||||
result = instance.query("SELECT * FROM test.view ORDER by key")
|
||||
expected = ""
|
||||
for i in range(num_rows):
|
||||
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
|
||||
assert result == expected, f"Invalid result for {format_name}"
|
||||
|
||||
results = []
|
||||
for format_name in [
|
||||
"JSONColumns",
|
||||
"Native",
|
||||
"Arrow",
|
||||
"Parquet",
|
||||
"ORC",
|
||||
"JSONCompactColumns",
|
||||
]:
|
||||
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_system_kafka_consumers(kafka_cluster):
|
||||
@ -5363,54 +5300,6 @@ def test_formats_errors(kafka_cluster):
|
||||
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
|
||||
)
|
||||
|
||||
p = Pool(10)
|
||||
|
||||
def run_for_format_name(format_name):
|
||||
with kafka_topic(admin_client, format_name):
|
||||
table_name = f"kafka_{format_name}"
|
||||
view_name = f"kafka_view_{format_name}"
|
||||
|
||||
instance.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.{view_name};
|
||||
DROP TABLE IF EXISTS test.{table_name};
|
||||
|
||||
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = '{format_name}',
|
||||
kafka_group_name = '{format_name}',
|
||||
kafka_format = '{format_name}',
|
||||
kafka_max_rows_per_message = 5,
|
||||
format_template_row='template_row.format',
|
||||
format_regexp='id: (.+?)',
|
||||
input_format_with_names_use_header=0,
|
||||
format_schema='key_value_message:Message';
|
||||
|
||||
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
"""
|
||||
)
|
||||
|
||||
kafka_produce(
|
||||
kafka_cluster,
|
||||
format_name,
|
||||
["Broken message\nBroken message\nBroken message\n"],
|
||||
)
|
||||
|
||||
num_errors = int(
|
||||
instance.query_with_retry(
|
||||
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
|
||||
check_callback=lambda res: int(res) > 0,
|
||||
)
|
||||
)
|
||||
|
||||
assert num_errors > 0, f"No errors for {format_name}"
|
||||
|
||||
instance.query(f"DROP TABLE test.{table_name}")
|
||||
instance.query(f"DROP TABLE test.{view_name}")
|
||||
|
||||
results = []
|
||||
for format_name in [
|
||||
"Template",
|
||||
"Regexp",
|
||||
@ -5453,10 +5342,48 @@ def test_formats_errors(kafka_cluster):
|
||||
"HiveText",
|
||||
"MySQLDump",
|
||||
]:
|
||||
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
|
||||
with kafka_topic(admin_client, format_name):
|
||||
table_name = f"kafka_{format_name}"
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
instance.query(
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.{table_name};
|
||||
|
||||
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = '{format_name}',
|
||||
kafka_group_name = '{format_name}',
|
||||
kafka_format = '{format_name}',
|
||||
kafka_max_rows_per_message = 5,
|
||||
format_template_row='template_row.format',
|
||||
format_regexp='id: (.+?)',
|
||||
input_format_with_names_use_header=0,
|
||||
format_schema='key_value_message:Message';
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
|
||||
SELECT key, value FROM test.{table_name};
|
||||
"""
|
||||
)
|
||||
|
||||
kafka_produce(
|
||||
kafka_cluster,
|
||||
format_name,
|
||||
["Broken message\nBroken message\nBroken message\n"],
|
||||
)
|
||||
|
||||
num_errors = int(
|
||||
instance.query_with_retry(
|
||||
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
|
||||
check_callback=lambda res: int(res) > 0,
|
||||
)
|
||||
)
|
||||
|
||||
assert num_errors > 0
|
||||
|
||||
instance.query(f"DROP TABLE test.{table_name}")
|
||||
instance.query("DROP TABLE test.view")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
Loading…
Reference in New Issue
Block a user