kafka_dead_letter_queue: StorageKafka2 covered

This commit is contained in:
Ilya Golshtein 2024-08-23 23:23:02 +00:00
parent 9b96ad1d5e
commit ff49aef8ac

View File

@ -7,6 +7,7 @@
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
@ -823,6 +824,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
switch (getHandleKafkaErrorMode())
{
case ExtStreamingHandleErrorMode::STREAM:
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
exception_message = e.message();
for (const auto & column : result_columns)
@ -847,11 +849,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
consumer.currentOffset());
throw std::move(e);
}
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
LOG_DEBUG(log, "Not implemented.");
break;
}
}
return 1;
};
@ -945,6 +942,32 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
virtual_columns[9]->insertDefault();
}
}
else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE");
if (exception_message)
{
const auto time_now = std::chrono::system_clock::now();
auto dead_letter_queue = getContext()->getDeadLetterQueue();
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add");
dead_letter_queue->add(
DeadLetterQueueElement{
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
.event_time = timeInSeconds(time_now),
.event_time_microseconds = timeInMicroseconds(time_now),
.database_name = getStorageID().database_name,
.table_name = getStorageID().table_name,
.topic_name = consumer.currentTopic(),
.partition = consumer.currentPartition(),
.offset = consumer.currentPartition(),
.raw_message = consumer.currentPayload(),
.error = exception_message.value(),
});
}
}
}
total_rows = total_rows + new_rows;