diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index de6492e8ea7..687e9e19f8a 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -62,7 +62,7 @@ Optional parameters: - `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size). - `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms). - `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`. -- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). +- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`), dead_letter_queue (error related data will be saved in system.dead_letter_queue) . - `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`. - `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`. diff --git a/docs/en/operations/system-tables/dead_letter_queue.md b/docs/en/operations/system-tables/dead_letter_queue.md new file mode 100644 index 00000000000..47b7b971b75 --- /dev/null +++ b/docs/en/operations/system-tables/dead_letter_queue.md @@ -0,0 +1,87 @@ +--- +slug: /en/operations/system-tables/dead_letter_queue +--- +# dead_letter_queue + +Contains information about messages received via a stream engine and parsed with an errors. Currently implemented for Kafka. + +Logging is controlled by `dead_letter_queue` of `kafka_handle_error_mode` setting. + +The flushing period of data is set in `flush_interval_milliseconds` parameter of the [dead_letter_queue](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-dead_letter_queue) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query. + +ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details. + +Columns: + +- `stream_type` ([Enum8](../../sql-reference/data-types/enum.md)) - Stream type. Possible values: 'Kafka'. +- `event_date` ([Date](../../sql-reference/data-types/date.md)) - Message consuming date. +- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) - Message consuming date and time. +- `event_time_microseconds`([DateTime64](../../sql-reference/data-types/datetime64.md)) - Message consuming time with microseconds precision. +- `database_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse database Kafka table belongs to. +- `table_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse table name. +- `topic_name` ([Nullable(String)](../../sql-reference/data-types/nullable.md)) - Topic name. +- `partition` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Partition. +- `offset` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Offset. +- `raw_message` ([String](../../sql-reference/data-types/string.md)) - Message body. +- `error` ([String](../../sql-reference/data-types/string.md)) - Error text. + +**Example** + +Query: + +``` sql +SELECT * FROM system.asynchronous_insert_log LIMIT 1 \G; +``` + +Result: + +``` text +Row 1: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268091 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: qwertyuiop +error: Message has too many segments. Most likely, data was corrupted: (at row 1) + + +Row 2: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268361 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: asdfghjkl +error: Message has too many segments. Most likely, data was corrupted: (at row 1) + + +Row 3: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268604 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: zxcvbnm +error: Message has too many segments. Most likely, data was corrupted: (at row 1) +``` + +**See Also** + +- [Kafka](../../engines/table-engines/integrations/kafka) - Kafka Engine +- [system.kafka_consumers](../../operations/system-tables/kafka_consumers.md#system_tables-kafka_consumers) — Description of the `kafka_consumers` system table which contains information like statistics and errors about Kafka consumers. diff --git a/docs/ru/engines/table-engines/integrations/kafka.md b/docs/ru/engines/table-engines/integrations/kafka.md index fb62f30ef9a..e09bde6a4a4 100644 --- a/docs/ru/engines/table-engines/integrations/kafka.md +++ b/docs/ru/engines/table-engines/integrations/kafka.md @@ -64,7 +64,7 @@ SETTINGS - `kafka_poll_max_batch_size` - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size) - `kafka_flush_interval_ms` - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms) - `kafka_thread_per_consumer` — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию `0`). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок. -- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream. +- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream, dead_letter_queue. - `kafka_commit_on_select` - Сообщение о commit при запросе select. По умолчанию: `false`. - `kafka_max_rows_per_message` - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: `1`. diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index 45f4eb1c5a6..46a13988985 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include diff --git a/src/Common/SystemLogBase.h b/src/Common/SystemLogBase.h index 0942e920a42..3ac6b199fb3 100644 --- a/src/Common/SystemLogBase.h +++ b/src/Common/SystemLogBase.h @@ -32,6 +32,7 @@ M(AsynchronousInsertLogElement) \ M(BackupLogElement) \ M(BlobStorageLogElement) \ + M(DeadLetterQueueElement) \ M(ErrorLogElement) namespace Poco diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 0c43b5256d6..17cb4262b21 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -141,6 +141,11 @@ IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, {{"default", StreamingHandleErrorMode::DEFAULT}, {"stream", StreamingHandleErrorMode::STREAM}}) +IMPLEMENT_SETTING_ENUM(ExtStreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, + {{"default", ExtStreamingHandleErrorMode::DEFAULT}, + {"stream", ExtStreamingHandleErrorMode::STREAM}, + {"dead_letter_queue", ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE}}) + IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS, {{"enable", ShortCircuitFunctionEvaluation::ENABLE}, {"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d358a2d44a0..8da3a74720a 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -262,11 +262,20 @@ enum class StreamingHandleErrorMode : uint8_t { DEFAULT = 0, // Ignore errors with threshold. STREAM, // Put errors to stream in the virtual column named ``_error. +}; + +DECLARE_SETTING_ENUM(StreamingHandleErrorMode) + +enum class ExtStreamingHandleErrorMode : uint8_t +{ + DEFAULT = 0, // Ignore errors with threshold. + STREAM, // Put errors to stream in the virtual column named ``_error. + DEAD_LETTER_QUEUE /*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likely system.kafka_errors. This is not implemented now. */ /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ }; -DECLARE_SETTING_ENUM(StreamingHandleErrorMode) +DECLARE_SETTING_ENUM(ExtStreamingHandleErrorMode) DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 311fd094706..0248a8a3f5d 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -4174,7 +4174,6 @@ std::shared_ptr Context::getPartLog(const String & part_database) const return shared->system_logs->part_log; } - std::shared_ptr Context::getTraceLog() const { SharedLockGuard lock(shared->mutex); @@ -4344,6 +4343,15 @@ std::shared_ptr Context::getBlobStorageLog() const return shared->system_logs->blob_storage_log; } +std::shared_ptr Context::getDeadLetterQueue() const +{ + SharedLockGuard lock(shared->mutex); + if (!shared->system_logs) + return {}; + + return shared->system_logs->dead_letter_queue; +} + SystemLogs Context::getSystemLogs() const { SharedLockGuard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 0daef2243aa..30570f4f1e0 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -114,6 +114,7 @@ class ObjectStorageQueueLog; class AsynchronousInsertLog; class BackupLog; class BlobStorageLog; +class DeadLetterQueue; class IAsynchronousReader; class IOUringReader; struct MergeTreeSettings; @@ -1160,6 +1161,7 @@ public: std::shared_ptr getAsynchronousInsertLog() const; std::shared_ptr getBackupLog() const; std::shared_ptr getBlobStorageLog() const; + std::shared_ptr getDeadLetterQueue() const; SystemLogs getSystemLogs() const; diff --git a/src/Interpreters/DeadLetterQueue.cpp b/src/Interpreters/DeadLetterQueue.cpp new file mode 100644 index 00000000000..39387ad795e --- /dev/null +++ b/src/Interpreters/DeadLetterQueue.cpp @@ -0,0 +1,66 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +ColumnsDescription DeadLetterQueueElement::getColumnsDescription() +{ + auto low_cardinality_string = std::make_shared(std::make_shared()); + + auto stream_type = std::make_shared( + DataTypeEnum8::Values{ + {"Kafka", static_cast(StreamType::Kafka)}, + }); + + return ColumnsDescription + { + {"stream_type", stream_type, "Stream type. Possible values: 'Kafka'."}, + {"event_date", std::make_shared(), "Message consuming date."}, + {"event_time", std::make_shared(), "Message consuming date and time."}, + {"event_time_microseconds", std::make_shared(6), "Query starting time with microseconds precision."}, + {"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."}, + {"table_name", low_cardinality_string, "ClickHouse table name."}, + {"topic_name", std::make_shared(std::make_shared()), "Topic name."}, + {"partition", std::make_shared(std::make_shared()), "Partition."}, + {"offset", std::make_shared(std::make_shared()), "Offset."}, + {"raw_message", std::make_shared(), "Message body."}, + {"error", std::make_shared(), "Error text."} + }; +} + +void DeadLetterQueueElement::appendToBlock(MutableColumns & columns) const +{ + size_t i = 0; + + columns[i++]->insert(static_cast(stream_type)); + columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); + columns[i++]->insert(event_time); + columns[i++]->insert(event_time_microseconds); + + columns[i++]->insertData(database_name.data(), database_name.size()); + columns[i++]->insertData(table_name.data(), table_name.size()); + columns[i++]->insertData(topic_name.data(), topic_name.size()); + + columns[i++]->insert(partition); + columns[i++]->insert(offset); + + columns[i++]->insertData(raw_message.data(), raw_message.size()); + columns[i++]->insertData(error.data(), error.size()); + + +} + +NamesAndAliases DeadLetterQueueElement::getNamesAndAliases() +{ + return NamesAndAliases{}; +} +} diff --git a/src/Interpreters/DeadLetterQueue.h b/src/Interpreters/DeadLetterQueue.h new file mode 100644 index 00000000000..01e9292270f --- /dev/null +++ b/src/Interpreters/DeadLetterQueue.h @@ -0,0 +1,56 @@ +#pragma once +#include +#include +#include +#include + + +/// should be called ...Log for uniformity + +// event_time, +// database, +// table, +// topic, +// partition, +// offset, +// raw_message, +// error + +namespace DB +{ + + +struct DeadLetterQueueElement +{ + enum class StreamType : int8_t + { + Kafka = 1, + }; + + StreamType stream_type; + UInt64 event_time{}; + Decimal64 event_time_microseconds{}; + + String database_name; + String table_name; + String topic_name; + Int64 partition; + Int64 offset; + String raw_message; + String error; + + static std::string name() { return "DeadLetterQueue"; } + + static ColumnsDescription getColumnsDescription(); + static NamesAndAliases getNamesAndAliases(); + void appendToBlock(MutableColumns & columns) const; +}; + +class DeadLetterQueue : public SystemLog +{ + using SystemLog::SystemLog; + + +}; + +} diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 6a3ec197c6e..4ca2457e7a7 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index c03f9370068..e66c71a5602 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -30,6 +30,7 @@ M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \ M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \ M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \ + M(DeadLetterQueue, dead_letter_queue, "Contains messages that came from a streaming engine (e.g. Kafka) and were parsed unsuccessfully.") \ namespace DB diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index 9ca5e189f0e..44a91af6ef1 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -35,7 +35,7 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000; /* default is stream_flush_interval_ms */ \ M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \ M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ - M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ + M(ExtStreamingHandleErrorMode, kafka_handle_error_mode, ExtStreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after kafka_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error), dead_letter_queue", 0) \ M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \ M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \ M(String, kafka_keeper_path, "", "The path to the table in ClickHouse Keeper", 0) \ diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 3ddd0d1be8c..071f901bb5e 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -46,7 +47,7 @@ KafkaSource::KafkaSource( , commit_in_suffix(commit_in_suffix_) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) , virtual_header(storage.getVirtualsHeader()) - , handle_error_mode(storage.getStreamingHandleErrorMode()) + , handle_error_mode(storage.getHandleErrorMode()) { } @@ -98,8 +99,6 @@ Chunk KafkaSource::generateImpl() // otherwise external iteration will reuse that and logic will became even more fuzzy MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM; - EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); @@ -108,34 +107,39 @@ Chunk KafkaSource::generateImpl() size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&, this](const MutableColumns & result_columns, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); - if (put_error_to_stream) + switch (handle_error_mode) { - exception_message = e.message(); - for (const auto & column : result_columns) + case ExtStreamingHandleErrorMode::STREAM: + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + exception_message = e.message(); + for (const auto & column : result_columns) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); - // all data columns will get default value in case of error - column->insertDefault(); + // all data columns will get default value in case of error + column->insertDefault(); + } + break; } + case ExtStreamingHandleErrorMode::DEFAULT: + { + e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", + consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); + consumer->setExceptionInfo(e.message()); + throw std::move(e); + } + } + return 1; - return 1; - } - else - { - e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", - consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); - consumer->setExceptionInfo(e.message()); - throw std::move(e); - } }; StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); @@ -203,7 +207,7 @@ Chunk KafkaSource::generateImpl() } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - if (put_error_to_stream) + if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) { @@ -217,6 +221,31 @@ Chunk KafkaSource::generateImpl() virtual_columns[9]->insertDefault(); } } + else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) + { + if (exception_message) + { + + const auto time_now = std::chrono::system_clock::now(); + auto storage_id = storage.getStorageID(); + + auto dead_letter_queue = context->getDeadLetterQueue(); + dead_letter_queue->add( + DeadLetterQueueElement{ + .stream_type = DeadLetterQueueElement::StreamType::Kafka, + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = storage_id.database_name, + .table_name = storage_id.table_name, + .topic_name = consumer->currentTopic(), + .partition = consumer->currentPartition(), + .offset = consumer->currentPartition(), + .raw_message = consumer->currentPayload(), + .error = exception_message.value(), + }); + } + + } } total_rows = total_rows + new_rows; @@ -232,7 +261,7 @@ Chunk KafkaSource::generateImpl() else { // We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal - // TODO: it seems like in case of put_error_to_stream=true we may need to process those differently + // TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently // currently we just skip them with note in logs. consumer->storeLastReadMessageOffset(); LOG_DEBUG(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); diff --git a/src/Storages/Kafka/KafkaSource.h b/src/Storages/Kafka/KafkaSource.h index a1b94b15a19..1ce2ee82c32 100644 --- a/src/Storages/Kafka/KafkaSource.h +++ b/src/Storages/Kafka/KafkaSource.h @@ -51,7 +51,7 @@ private: const Block non_virtual_header; const Block virtual_header; - const StreamingHandleErrorMode handle_error_mode; + const ExtStreamingHandleErrorMode handle_error_mode; Poco::Timespan max_execution_time = 0; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index f4f641d1c68..b62c39f2695 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -165,7 +165,8 @@ StorageKafka::StorageKafka( { kafka_settings->sanityCheck(); - if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::STREAM || + kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 966d818d675..760105e0d07 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -80,7 +80,7 @@ public: const auto & getFormatName() const { return format_name; } - StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; } + ExtStreamingHandleErrorMode getHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; } struct SafeConsumers { diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 3574b46e3b0..cf3e1ac65d7 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -125,7 +126,8 @@ StorageKafka2::StorageKafka2( if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumers, it is required to use `kafka_thread_per_consumer` setting"); - if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) + if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM || + getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; @@ -134,7 +136,7 @@ StorageKafka2::StorageKafka2( storage_metadata.setColumns(columns_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - setVirtuals(StorageKafkaUtils::createVirtuals(kafka_settings->kafka_handle_error_mode)); + setVirtuals(StorageKafkaUtils::createVirtuals(getHandleKafkaErrorMode())); auto task_count = thread_per_consumer ? num_consumers : 1; for (size_t i = 0; i < task_count; ++i) @@ -807,8 +809,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( // otherwise external iteration will reuse that and logic will became even more fuzzy MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto put_error_to_stream = kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM; - EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( getFormatName(), empty_buf, non_virtual_header, modified_context, getMaxBlockSize(), std::nullopt, 1); @@ -817,36 +817,40 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&, this](const MutableColumns & result_columns, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); - if (put_error_to_stream) + switch (getHandleKafkaErrorMode()) { - exception_message = e.message(); - for (const auto & column : result_columns) + case ExtStreamingHandleErrorMode::STREAM: + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + exception_message = e.message(); + for (const auto & column : result_columns) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); - // all data columns will get default value in case of error - column->insertDefault(); + // all data columns will get default value in case of error + column->insertDefault(); + } + break; + } + case ExtStreamingHandleErrorMode::DEFAULT: + { + e.addMessage( + "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", + consumer.currentTopic(), + consumer.currentPartition(), + consumer.currentOffset()); + throw std::move(e); } - - return 1; - } - else - { - e.addMessage( - "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", - consumer.currentTopic(), - consumer.currentPartition(), - consumer.currentOffset()); - throw std::move(e); } + return 1; }; StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); @@ -922,7 +926,8 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - if (put_error_to_stream) + + if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) { @@ -935,6 +940,30 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( virtual_columns[9]->insertDefault(); } } + else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) + { + if (exception_message) + { + + const auto time_now = std::chrono::system_clock::now(); + + auto dead_letter_queue = getContext()->getDeadLetterQueue(); + dead_letter_queue->add( + DeadLetterQueueElement{ + .stream_type = DeadLetterQueueElement::StreamType::Kafka, + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = getStorageID().database_name, + .table_name = getStorageID().table_name, + .topic_name = consumer.currentTopic(), + .partition = consumer.currentPartition(), + .offset = consumer.currentPartition(), + .raw_message = consumer.currentPayload(), + .error = exception_message.value(), + }); + } + + } } total_rows = total_rows + new_rows; @@ -947,7 +976,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( else { // We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal - // TODO: it seems like in case of put_error_to_stream=true we may need to process those differently + // TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently // currently we just skip them with note in logs. LOG_DEBUG( log, diff --git a/src/Storages/Kafka/StorageKafka2.h b/src/Storages/Kafka/StorageKafka2.h index f85fedb316a..cd0485c0867 100644 --- a/src/Storages/Kafka/StorageKafka2.h +++ b/src/Storages/Kafka/StorageKafka2.h @@ -89,7 +89,7 @@ public: const auto & getFormatName() const { return format_name; } - StreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } + ExtStreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } private: using TopicPartition = KafkaConsumer2::TopicPartition; diff --git a/src/Storages/Kafka/StorageKafkaUtils.cpp b/src/Storages/Kafka/StorageKafkaUtils.cpp index cdc32d775eb..327d812e843 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.cpp +++ b/src/Storages/Kafka/StorageKafkaUtils.cpp @@ -427,7 +427,7 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context) } -VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode) +VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode) { VirtualColumnsDescription desc; @@ -440,7 +440,7 @@ VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_m desc.addEphemeral("_headers.name", std::make_shared(std::make_shared()), ""); desc.addEphemeral("_headers.value", std::make_shared(std::make_shared()), ""); - if (handle_error_mode == StreamingHandleErrorMode::STREAM) + if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM) { desc.addEphemeral("_raw_message", std::make_shared(), ""); desc.addEphemeral("_error", std::make_shared(), ""); diff --git a/src/Storages/Kafka/StorageKafkaUtils.h b/src/Storages/Kafka/StorageKafkaUtils.h index cc956dde78d..c7892c28306 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.h +++ b/src/Storages/Kafka/StorageKafkaUtils.h @@ -47,7 +47,7 @@ SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const bool checkDependencies(const StorageID & table_id, const ContextPtr& context); -VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode); +VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode); } } diff --git a/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml b/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml new file mode 100644 index 00000000000..cdd1b807173 --- /dev/null +++ b/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml @@ -0,0 +1,8 @@ + + + system + dead_letter_queue
+ toYYYYMM(event_date) + 1000 +
+
diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 0446ca5cb47..694bf4774e6 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -13,7 +13,7 @@ if is_arm(): cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", - main_configs=["configs/kafka.xml"], + main_configs=["configs/kafka.xml", "configs/dead_letter_queue.xml"], with_kafka=True, ) @@ -123,7 +123,39 @@ def kafka_cluster(): cluster.shutdown() -def test_bad_messages_parsing_stream(kafka_cluster): +def view_test(expected_num_messages, *_): + attempt = 0 + rows = 0 + while attempt < 500: + time.sleep(0.1) + rows = int(instance.query("SELECT count() FROM view")) + if rows == expected_num_messages: + break + attempt += 1 + + assert rows == expected_num_messages + + +def dead_letter_queue_test(expected_num_messages, topic_name): + view_test(expected_num_messages) + instance.query("SYSTEM FLUSH LOGS") + + result = instance.query( + f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical" + ) + logging.debug(f"system.dead_letter_queue contains {result}") + + rows = int( + instance.query( + f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'" + ) + ) + assert rows == expected_num_messages + + +def bad_messages_parsing_mode( + kafka_cluster, handle_error_mode, additional_dml, check_method +): admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) ) @@ -151,8 +183,9 @@ def test_bad_messages_parsing_stream(kafka_cluster): "MySQLDump", ]: print(format_name) + topic_name = f"{format_name}_{handle_error_mode}_err" - kafka_create_topic(admin_client, f"{format_name}_err") + kafka_create_topic(admin_client, f"{topic_name}") instance.query( f""" @@ -162,30 +195,21 @@ def test_bad_messages_parsing_stream(kafka_cluster): CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}_err', + kafka_topic_list = '{topic_name}', kafka_group_name = '{format_name}', kafka_format = '{format_name}', - kafka_handle_error_mode='stream'; + kafka_handle_error_mode= '{handle_error_mode}'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0 ; + {additional_dml} """ ) messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] - kafka_produce(kafka_cluster, f"{format_name}_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 + check_method(len(messages), topic_name) - assert rows == len(messages) - - kafka_delete_topic(admin_client, f"{format_name}_err") + kafka_delete_topic(admin_client, f"{topic_name}") protobuf_schema = """ syntax = "proto3"; @@ -199,6 +223,7 @@ message Message { instance.create_format_schema("schema_test_errors.proto", protobuf_schema) for format_name in ["Protobuf", "ProtobufSingle", "ProtobufList"]: + topic_name = f"{format_name}_{handle_error_mode}_err" instance.query( f""" DROP TABLE IF EXISTS view; @@ -207,35 +232,26 @@ message Message { CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}_err', + kafka_topic_list = '{topic_name}', kafka_group_name = '{format_name}', kafka_format = '{format_name}', - kafka_handle_error_mode='stream', + kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0 ; + {additional_dml} """ ) print(format_name) - kafka_create_topic(admin_client, f"{format_name}_err") + kafka_create_topic(admin_client, f"{topic_name}") messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"] - kafka_produce(kafka_cluster, f"{format_name}_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 + check_method(len(messages), topic_name) - assert rows == len(messages) - - kafka_delete_topic(admin_client, f"{format_name}_err") + kafka_delete_topic(admin_client, f"{topic_name}") capn_proto_schema = """ @0xd9dd7b35452d1c4f; @@ -248,6 +264,7 @@ struct Message """ instance.create_format_schema("schema_test_errors.capnp", capn_proto_schema) + topic_name = f"CapnProto_{handle_error_mode}_err" instance.query( f""" DROP TABLE IF EXISTS view; @@ -256,35 +273,44 @@ struct Message CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'CapnProto_err', + kafka_topic_list = '{topic_name}', kafka_group_name = 'CapnProto', kafka_format = 'CapnProto', - kafka_handle_error_mode='stream', + kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0; + {additional_dml} """ ) print("CapnProto") - kafka_create_topic(admin_client, "CapnProto_err") + kafka_create_topic(admin_client, f"{topic_name}") messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] - kafka_produce(kafka_cluster, "CapnProto_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 + check_method(len(messages), topic_name) - assert rows == len(messages) + kafka_delete_topic(admin_client, f"{topic_name}") - kafka_delete_topic(admin_client, "CapnProto_err") + +def test_bad_messages_parsing_stream(kafka_cluster): + bad_messages_parsing_mode( + kafka_cluster, + "stream", + "CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0", + view_test, + ) + + +def test_bad_messages_parsing_dead_letter_queue(kafka_cluster): + bad_messages_parsing_mode( + kafka_cluster, + "dead_letter_queue", + "CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka", + dead_letter_queue_test, + ) def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):