kafka_dead_letter_queue: rename, small cleanup

This commit is contained in:
Ilya Golshtein 2024-08-23 21:54:34 +00:00
parent be842c9ff9
commit 9b96ad1d5e
6 changed files with 14 additions and 10 deletions

View File

@ -18,7 +18,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Interpreters/DeadLetterQueue.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>

View File

@ -1,4 +1,4 @@
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeDate.h>

View File

@ -28,7 +28,7 @@
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>

View File

@ -6,7 +6,7 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
#include <Interpreters/KafkaDeadLetterQueue.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Common/ProfileEvents.h>

View File

@ -122,7 +122,7 @@ def kafka_cluster():
finally:
cluster.shutdown()
def view_test(expected_num_messages):
def view_test(expected_num_messages, *_):
attempt = 0
rows = 0
while attempt < 500:
@ -134,12 +134,16 @@ def view_test(expected_num_messages):
assert rows == expected_num_messages
def dead_letter_queue_test(expected_num_messages):
def dead_letter_queue_test(expected_num_messages, topic_name):
view_test(expected_num_messages)
instance.query("SYSTEM FLUSH LOGS")
result = instance.query("SELECT * FROM system.dead_letter_queue")
result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")
logging.debug(f"system.dead_letter_queue contains {result}")
rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'"))
assert rows == expected_num_messages
def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method):
admin_client = KafkaAdminClient(
@ -193,7 +197,7 @@ def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml,
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{topic_name}", messages)
check_method(len(messages))
check_method(len(messages), topic_name)
kafka_delete_topic(admin_client, f"{topic_name}")
@ -235,7 +239,7 @@ message Message {
messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{topic_name}", messages)
check_method(len(messages))
check_method(len(messages), topic_name)
kafka_delete_topic(admin_client, f"{topic_name}")
@ -276,7 +280,7 @@ struct Message
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{topic_name}", messages)
check_method(len(messages))
check_method(len(messages), topic_name)
kafka_delete_topic(admin_client, f"{topic_name}")