diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 9e2e9bb3389..e918158d7c4 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -823,6 +824,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( switch (getHandleKafkaErrorMode()) { case ExtStreamingHandleErrorMode::STREAM: + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: { exception_message = e.message(); for (const auto & column : result_columns) @@ -847,11 +849,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( consumer.currentOffset()); throw std::move(e); } - case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: - { - LOG_DEBUG(log, "Not implemented."); - break; - } } return 1; }; @@ -945,6 +942,32 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( virtual_columns[9]->insertDefault(); } } + else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) + { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); + if (exception_message) + { + + const auto time_now = std::chrono::system_clock::now(); + + auto dead_letter_queue = getContext()->getDeadLetterQueue(); + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); + dead_letter_queue->add( + DeadLetterQueueElement{ + .stream_type = DeadLetterQueueElement::StreamType::Kafka, + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = getStorageID().database_name, + .table_name = getStorageID().table_name, + .topic_name = consumer.currentTopic(), + .partition = consumer.currentPartition(), + .offset = consumer.currentPartition(), + .raw_message = consumer.currentPayload(), + .error = exception_message.value(), + }); + } + + } } total_rows = total_rows + new_rows;