diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index e09a061cadd..8729649d9b4 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Interpreters/KafkaDeadLetterQueue.cpp b/src/Interpreters/DeadLetterQueue.cpp similarity index 98% rename from src/Interpreters/KafkaDeadLetterQueue.cpp rename to src/Interpreters/DeadLetterQueue.cpp index 731e8992cf6..f41127bc0bf 100644 --- a/src/Interpreters/KafkaDeadLetterQueue.cpp +++ b/src/Interpreters/DeadLetterQueue.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include diff --git a/src/Interpreters/KafkaDeadLetterQueue.h b/src/Interpreters/DeadLetterQueue.h similarity index 100% rename from src/Interpreters/KafkaDeadLetterQueue.h rename to src/Interpreters/DeadLetterQueue.h diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 384e96fbfd7..5a6c325ed4d 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 62e45038b73..69696a04d35 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 4d354201b0f..5621e9ef753 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -122,7 +122,7 @@ def kafka_cluster(): finally: cluster.shutdown() -def view_test(expected_num_messages): +def view_test(expected_num_messages, *_): attempt = 0 rows = 0 while attempt < 500: @@ -134,12 +134,16 @@ def view_test(expected_num_messages): assert rows == expected_num_messages -def dead_letter_queue_test(expected_num_messages): +def dead_letter_queue_test(expected_num_messages, topic_name): view_test(expected_num_messages) + instance.query("SYSTEM FLUSH LOGS") - result = instance.query("SELECT * FROM system.dead_letter_queue") + result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'") logging.debug(f"system.dead_letter_queue contains {result}") + rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")) + assert rows == expected_num_messages + def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method): admin_client = KafkaAdminClient( @@ -193,7 +197,7 @@ def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}") @@ -235,7 +239,7 @@ message Message { messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}") @@ -276,7 +280,7 @@ struct Message messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}")