mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Merge pull request #52273 from ClickHouse/51844-fix-test_storage_kafkatestpytest_kafka_formats_with_broken_message-test
Try to make `test_kafka_formats_with_broken_message` and `test_kafka_formats` integration tests stable
This commit is contained in:
commit
3ce8dc5503
@ -843,24 +843,7 @@ def test_kafka_formats(kafka_cluster):
|
||||
extra_settings=format_opts.get("extra_settings") or "",
|
||||
)
|
||||
)
|
||||
|
||||
instance.wait_for_log_line(
|
||||
"kafka.*Committed offset [0-9]+.*format_tests_",
|
||||
repetitions=len(all_formats.keys()),
|
||||
look_behind_lines=12000,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(("Checking {}".format(format_name)))
|
||||
topic_name = f"format_tests_{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_{format_name}_mv;".format(format_name=format_name)
|
||||
)
|
||||
expected = """\
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
2 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -878,7 +861,27 @@ def test_kafka_formats(kafka_cluster):
|
||||
14 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
15 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
|
||||
""".format(
|
||||
"""
|
||||
|
||||
expected_rows_count = raw_expected.count("\n")
|
||||
instance.query_with_retry(
|
||||
f"SELECT * FROM test.kafka_{list(all_formats.keys())[-1]}_mv;",
|
||||
retry_count=30,
|
||||
sleep_time=1,
|
||||
check_callback=lambda res: res.count("\n") == expected_rows_count,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(("Checking {}".format(format_name)))
|
||||
topic_name = f"format_tests_{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_{format_name}_mv;".format(format_name=format_name)
|
||||
)
|
||||
expected = raw_expected.format(
|
||||
topic_name=topic_name,
|
||||
offset_0=offsets[0],
|
||||
offset_1=offsets[1],
|
||||
@ -3755,19 +3758,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
)
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug("Checking {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_data_{format_name}_mv;".format(
|
||||
format_name=format_name
|
||||
)
|
||||
)
|
||||
expected = """\
|
||||
raw_expected = """\
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
|
||||
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
2 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
@ -3785,7 +3776,29 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
14 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
15 0 AM 0.5 1 {topic_name} 0 {offset_1}
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
|
||||
""".format(
|
||||
"""
|
||||
|
||||
expected_rows_count = raw_expected.count("\n")
|
||||
instance.query_with_retry(
|
||||
f"SELECT * FROM test.kafka_data_{list(all_formats.keys())[-1]}_mv;",
|
||||
retry_count=30,
|
||||
sleep_time=1,
|
||||
check_callback=lambda res: res.count("\n") == expected_rows_count,
|
||||
)
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(f"Checking {format_name}")
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = (
|
||||
[1, 2, 3] if format_opts.get("supports_empty_value", False) else [0, 1, 2]
|
||||
)
|
||||
result = instance.query(
|
||||
"SELECT * FROM test.kafka_data_{format_name}_mv;".format(
|
||||
format_name=format_name
|
||||
)
|
||||
)
|
||||
expected = raw_expected.format(
|
||||
topic_name=topic_name,
|
||||
offset_0=offsets[0],
|
||||
offset_1=offsets[1],
|
||||
|
Loading…
Reference in New Issue
Block a user