mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Add checked waits to improve kafka tests
It might take a few minutes to receive all the messages for the last materialized view. By waiting to the number of expected results the happy path of execution takes minimal time while becoming more stable, while the erroneous path might take a bit longer.
This commit is contained in:
parent
8d10dd71f7
commit
6a104cc3f7
@ -843,24 +843,7 @@ def test_kafka_formats(kafka_cluster):
|
||||
extra_settings=format_opts.get("extra_settings") or "",
|
||||
)
|
||||
)
|
||||
|
||||
instance.wait_for_log_line(
|
||||
"kafka.*Committed offset [0-9]+.*format_tests_",
|
||||
repetitions=len(all_formats.keys()),
|
||||
look_behind_lines=12000,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(("Checking {}".format(format_name)))
|
||||
topic_name = f"format_tests_{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_{format_name}_mv;".format(format_name=format_name)
|
||||
)
|
||||
expected = """\
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
2 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -878,7 +861,27 @@ def test_kafka_formats(kafka_cluster):
|
||||
14 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
15 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
|
||||
""".format(
|
||||
"""
|
||||
|
||||
expected_rows_count = raw_expected.count("\n")
|
||||
instance.query_with_retry(
|
||||
f"SELECT * FROM test.kafka_data_{list(all_formats.keys())[-1]}_mv;",
|
||||
retry_count=30,
|
||||
sleep_time=1,
|
||||
check_callback=lambda res: res.count("\n") == expected_rows_count,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(("Checking {}".format(format_name)))
|
||||
topic_name = f"format_tests_{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_{format_name}_mv;".format(format_name=format_name)
|
||||
)
|
||||
expected = raw_expected.format(
|
||||
topic_name=topic_name,
|
||||
offset_0=offsets[0],
|
||||
offset_1=offsets[1],
|
||||
@ -3755,19 +3758,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
)
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug("Checking {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_data_{format_name}_mv;".format(
|
||||
format_name=format_name
|
||||
)
|
||||
)
|
||||
expected = """\
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
2 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -3785,7 +3776,29 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
14 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
15 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
|
||||
""".format(
|
||||
"""
|
||||
|
||||
expected_rows_count = raw_expected.count("\n")
|
||||
instance.query_with_retry(
|
||||
f"SELECT * FROM test.kafka_data_{list(all_formats.keys())[-1]}_mv;",
|
||||
retry_count=30,
|
||||
sleep_time=1,
|
||||
check_callback=lambda res: res.count("\n") == expected_rows_count,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(f"Checking {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_data_{format_name}_mv;".format(
|
||||
format_name=format_name
|
||||
)
|
||||
)
|
||||
expected = pre_formatted_expected.format(
|
||||
topic_name=topic_name,
|
||||
offset_0=offsets[0],
|
||||
offset_1=offsets[1],
|
||||
@ -4339,7 +4352,7 @@ def test_row_based_formats(kafka_cluster):
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
|
||||
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
@ -4347,10 +4360,10 @@ def test_row_based_formats(kafka_cluster):
|
||||
kafka_group_name = '{format_name}',
|
||||
kafka_format = '{format_name}',
|
||||
kafka_max_rows_per_message = 5;
|
||||
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
SELECT key, value FROM test.kafka;
|
||||
|
||||
|
||||
INSERT INTO test.kafka SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
|
||||
"""
|
||||
)
|
||||
@ -4459,17 +4472,17 @@ def test_block_based_formats_2(kafka_cluster):
|
||||
f"""
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
|
||||
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = '{format_name}',
|
||||
kafka_group_name = '{format_name}',
|
||||
kafka_format = '{format_name}';
|
||||
|
||||
|
||||
CREATE MATERIALIZED VIEW test.view Engine=Log AS
|
||||
SELECT key, value FROM test.kafka;
|
||||
|
||||
|
||||
INSERT INTO test.kafka SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;
|
||||
"""
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user