From b0887af6c6005d0edf7b3e40d2d1eff11175605a Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 21 Aug 2024 07:35:14 +0000 Subject: [PATCH 1/9] kafka_dead_letter_queue: initial, legacy tests work --- src/Common/SystemLogBase.cpp | 1 + src/Common/SystemLogBase.h | 1 + src/Core/SettingsEnums.cpp | 5 ++ src/Core/SettingsEnums.h | 11 ++- src/Interpreters/Context.cpp | 10 ++- src/Interpreters/Context.h | 2 + src/Interpreters/SystemLog.cpp | 1 + src/Interpreters/SystemLog.h | 1 + src/Storages/Kafka/KafkaSettings.h | 2 +- src/Storages/Kafka/KafkaSource.cpp | 70 ++++++++++++------- src/Storages/Kafka/KafkaSource.h | 2 +- src/Storages/Kafka/StorageKafka.cpp | 3 +- src/Storages/Kafka/StorageKafka.h | 2 +- src/Storages/Kafka/StorageKafka2.cpp | 63 +++++++++-------- src/Storages/Kafka/StorageKafka2.h | 2 +- src/Storages/Kafka/StorageKafkaUtils.cpp | 15 +++- src/Storages/Kafka/StorageKafkaUtils.h | 2 +- .../test_kafka_bad_messages/test.py | 53 ++++++-------- 18 files changed, 151 insertions(+), 95 deletions(-) diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index 127c8862a35..e09a061cadd 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include diff --git a/src/Common/SystemLogBase.h b/src/Common/SystemLogBase.h index 0d7b04d5c57..4a2778432b2 100644 --- a/src/Common/SystemLogBase.h +++ b/src/Common/SystemLogBase.h @@ -32,6 +32,7 @@ M(AsynchronousInsertLogElement) \ M(BackupLogElement) \ M(BlobStorageLogElement) \ + M(DeadLetterQueueElement) \ M(ErrorLogElement) namespace Poco diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index b53a882de4e..0996dc704b7 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -141,6 +141,11 @@ IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, {{"default", StreamingHandleErrorMode::DEFAULT}, {"stream", StreamingHandleErrorMode::STREAM}}) +IMPLEMENT_SETTING_ENUM(ExtStreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, + {{"default", ExtStreamingHandleErrorMode::DEFAULT}, + {"stream", ExtStreamingHandleErrorMode::STREAM}, + {"dead_letter_queue", ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE}}) + IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS, {{"enable", ShortCircuitFunctionEvaluation::ENABLE}, {"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index ac3264fe041..b6551bba532 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -261,11 +261,20 @@ enum class StreamingHandleErrorMode : uint8_t { DEFAULT = 0, // Ignore errors with threshold. STREAM, // Put errors to stream in the virtual column named ``_error. +}; + +DECLARE_SETTING_ENUM(StreamingHandleErrorMode) + +enum class ExtStreamingHandleErrorMode : uint8_t +{ + DEFAULT = 0, // Ignore errors with threshold. + STREAM, // Put errors to stream in the virtual column named ``_error. + DEAD_LETTER_QUEUE /*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likely system.kafka_errors. This is not implemented now. */ /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ }; -DECLARE_SETTING_ENUM(StreamingHandleErrorMode) +DECLARE_SETTING_ENUM(ExtStreamingHandleErrorMode) DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 4a08fd5fe5b..fe262ba54aa 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -4143,7 +4143,6 @@ std::shared_ptr Context::getPartLog(const String & part_database) const return shared->system_logs->part_log; } - std::shared_ptr Context::getTraceLog() const { SharedLockGuard lock(shared->mutex); @@ -4313,6 +4312,15 @@ std::shared_ptr Context::getBlobStorageLog() const return shared->system_logs->blob_storage_log; } +std::shared_ptr Context::getDeadLetterQueue() const +{ + SharedLockGuard lock(shared->mutex); + if (!shared->system_logs->dead_letter_queue) + return {}; + + return shared->system_logs->dead_letter_queue; +} + SystemLogs Context::getSystemLogs() const { SharedLockGuard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 3da4f124553..e94bae25205 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -114,6 +114,7 @@ class ObjectStorageQueueLog; class AsynchronousInsertLog; class BackupLog; class BlobStorageLog; +class DeadLetterQueue; class IAsynchronousReader; class IOUringReader; struct MergeTreeSettings; @@ -1151,6 +1152,7 @@ public: std::shared_ptr getAsynchronousInsertLog() const; std::shared_ptr getBackupLog() const; std::shared_ptr getBlobStorageLog() const; + std::shared_ptr getDeadLetterQueue() const; SystemLogs getSystemLogs() const; diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 832c39bfaf8..384e96fbfd7 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index 9e1af3578bd..5fae2d238d7 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -30,6 +30,7 @@ M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \ M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \ M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \ + M(DeadLetterQueue, dead_letter_queue, "Contains messages that came from a streaming engine (e.g. Kafka) and were parsed unsuccessfully.") \ namespace DB diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index 9ca5e189f0e..44a91af6ef1 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -35,7 +35,7 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000; /* default is stream_flush_interval_ms */ \ M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \ M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ - M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ + M(ExtStreamingHandleErrorMode, kafka_handle_error_mode, ExtStreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after kafka_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error), dead_letter_queue", 0) \ M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \ M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \ M(String, kafka_keeper_path, "", "The path to the table in ClickHouse Keeper", 0) \ diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 3ddd0d1be8c..4826ef97d13 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -46,7 +47,7 @@ KafkaSource::KafkaSource( , commit_in_suffix(commit_in_suffix_) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) , virtual_header(storage.getVirtualsHeader()) - , handle_error_mode(storage.getStreamingHandleErrorMode()) + , handle_error_mode(storage.getHandleErrorMode()) { } @@ -98,8 +99,6 @@ Chunk KafkaSource::generateImpl() // otherwise external iteration will reuse that and logic will became even more fuzzy MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM; - EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); @@ -108,34 +107,39 @@ Chunk KafkaSource::generateImpl() size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&, this](const MutableColumns & result_columns, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); - if (put_error_to_stream) + switch (handle_error_mode) { - exception_message = e.message(); - for (const auto & column : result_columns) + case ExtStreamingHandleErrorMode::STREAM: + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + exception_message = e.message(); + for (const auto & column : result_columns) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); - // all data columns will get default value in case of error - column->insertDefault(); + // all data columns will get default value in case of error + column->insertDefault(); + } + break; } + case ExtStreamingHandleErrorMode::DEFAULT: + { + e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", + consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); + consumer->setExceptionInfo(e.message()); + throw std::move(e); + } + } + return 1; - return 1; - } - else - { - e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", - consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); - consumer->setExceptionInfo(e.message()); - throw std::move(e); - } }; StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); @@ -203,7 +207,7 @@ Chunk KafkaSource::generateImpl() } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - if (put_error_to_stream) + if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) { @@ -217,6 +221,22 @@ Chunk KafkaSource::generateImpl() virtual_columns[9]->insertDefault(); } } + else if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM && exception_message) + { + const auto time_now = std::chrono::system_clock::now(); + auto storage_id = storage.getStorageID(); + + auto dead_letter_queue = context->getDeadLetterQueue(); + dead_letter_queue->add( + DeadLetterQueueElement{ + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = storage_id.database_name, + .table_name = storage_id.table_name, + .raw_message = consumer->currentPayload(), + .error = exception_message.value(), + }); + } } total_rows = total_rows + new_rows; @@ -232,7 +252,7 @@ Chunk KafkaSource::generateImpl() else { // We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal - // TODO: it seems like in case of put_error_to_stream=true we may need to process those differently + // TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently // currently we just skip them with note in logs. consumer->storeLastReadMessageOffset(); LOG_DEBUG(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); diff --git a/src/Storages/Kafka/KafkaSource.h b/src/Storages/Kafka/KafkaSource.h index a1b94b15a19..1ce2ee82c32 100644 --- a/src/Storages/Kafka/KafkaSource.h +++ b/src/Storages/Kafka/KafkaSource.h @@ -51,7 +51,7 @@ private: const Block non_virtual_header; const Block virtual_header; - const StreamingHandleErrorMode handle_error_mode; + const ExtStreamingHandleErrorMode handle_error_mode; Poco::Timespan max_execution_time = 0; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index f4f641d1c68..b62c39f2695 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -165,7 +165,8 @@ StorageKafka::StorageKafka( { kafka_settings->sanityCheck(); - if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::STREAM || + kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 966d818d675..760105e0d07 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -80,7 +80,7 @@ public: const auto & getFormatName() const { return format_name; } - StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; } + ExtStreamingHandleErrorMode getHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; } struct SafeConsumers { diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 3574b46e3b0..81e9c975af8 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -125,7 +125,8 @@ StorageKafka2::StorageKafka2( if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumers, it is required to use `kafka_thread_per_consumer` setting"); - if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) + if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM || + getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; @@ -134,7 +135,7 @@ StorageKafka2::StorageKafka2( storage_metadata.setColumns(columns_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - setVirtuals(StorageKafkaUtils::createVirtuals(kafka_settings->kafka_handle_error_mode)); + setVirtuals(StorageKafkaUtils::createVirtuals(getHandleKafkaErrorMode())); auto task_count = thread_per_consumer ? num_consumers : 1; for (size_t i = 0; i < task_count; ++i) @@ -807,8 +808,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( // otherwise external iteration will reuse that and logic will became even more fuzzy MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto put_error_to_stream = kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM; - EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( getFormatName(), empty_buf, non_virtual_header, modified_context, getMaxBlockSize(), std::nullopt, 1); @@ -817,36 +816,44 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&, this](const MutableColumns & result_columns, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); - if (put_error_to_stream) + switch (getHandleKafkaErrorMode()) { - exception_message = e.message(); - for (const auto & column : result_columns) + case ExtStreamingHandleErrorMode::STREAM: { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + exception_message = e.message(); + for (const auto & column : result_columns) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); - // all data columns will get default value in case of error - column->insertDefault(); + // all data columns will get default value in case of error + column->insertDefault(); + } + break; + } + case ExtStreamingHandleErrorMode::DEFAULT: + { + e.addMessage( + "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", + consumer.currentTopic(), + consumer.currentPartition(), + consumer.currentOffset()); + throw std::move(e); + } + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: + { + LOG_DEBUG(log, "Not implemented."); + break; } - - return 1; - } - else - { - e.addMessage( - "while parsing Kafka message (topic: {}, partition: {}, offset: {})'", - consumer.currentTopic(), - consumer.currentPartition(), - consumer.currentOffset()); - throw std::move(e); } + return 1; }; StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error)); @@ -922,7 +929,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - if (put_error_to_stream) + if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) { @@ -947,7 +954,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( else { // We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal - // TODO: it seems like in case of put_error_to_stream=true we may need to process those differently + // TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently // currently we just skip them with note in logs. LOG_DEBUG( log, diff --git a/src/Storages/Kafka/StorageKafka2.h b/src/Storages/Kafka/StorageKafka2.h index f85fedb316a..cd0485c0867 100644 --- a/src/Storages/Kafka/StorageKafka2.h +++ b/src/Storages/Kafka/StorageKafka2.h @@ -89,7 +89,7 @@ public: const auto & getFormatName() const { return format_name; } - StreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } + ExtStreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } private: using TopicPartition = KafkaConsumer2::TopicPartition; diff --git a/src/Storages/Kafka/StorageKafkaUtils.cpp b/src/Storages/Kafka/StorageKafkaUtils.cpp index cdc32d775eb..27b59841c29 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.cpp +++ b/src/Storages/Kafka/StorageKafkaUtils.cpp @@ -59,6 +59,8 @@ namespace ErrorCodes void registerStorageKafka(StorageFactory & factory) { + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "Top of registerStorageKafka"); + auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr { ASTs & engine_args = args.engine_args; @@ -72,6 +74,9 @@ void registerStorageKafka(StorageFactory & factory) for (const auto & setting : kafka_settings->all()) { const auto & setting_name = setting.getName(); + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka (named collection): processing {}", setting_name); + + if (named_collection->has(setting_name)) kafka_settings->set(setting_name, named_collection->get(setting_name)); } @@ -80,7 +85,9 @@ void registerStorageKafka(StorageFactory & factory) if (has_settings) { + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before loadFromQuery"); kafka_settings->loadFromQuery(*args.storage_def); + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after loadFromQuery"); } // Check arguments and settings @@ -154,7 +161,9 @@ void registerStorageKafka(StorageFactory & factory) CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0) + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0) + LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0) CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0) } @@ -282,6 +291,8 @@ void registerStorageKafka(StorageFactory & factory) args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name); }; + + factory.registerStorage( "Kafka", creator_fn, @@ -427,7 +438,7 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context) } -VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode) +VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode) { VirtualColumnsDescription desc; @@ -440,7 +451,7 @@ VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_m desc.addEphemeral("_headers.name", std::make_shared(std::make_shared()), ""); desc.addEphemeral("_headers.value", std::make_shared(std::make_shared()), ""); - if (handle_error_mode == StreamingHandleErrorMode::STREAM) + if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM) { desc.addEphemeral("_raw_message", std::make_shared(), ""); desc.addEphemeral("_error", std::make_shared(), ""); diff --git a/src/Storages/Kafka/StorageKafkaUtils.h b/src/Storages/Kafka/StorageKafkaUtils.h index cc956dde78d..c7892c28306 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.h +++ b/src/Storages/Kafka/StorageKafkaUtils.h @@ -47,7 +47,7 @@ SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const bool checkDependencies(const StorageID & table_id, const ContextPtr& context); -VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode); +VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode); } } diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 0446ca5cb47..07264db0696 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -13,7 +13,7 @@ if is_arm(): cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", - main_configs=["configs/kafka.xml"], + main_configs=["configs/kafka.xml", "configs/dead_letter_queue.xml"], with_kafka=True, ) @@ -122,8 +122,19 @@ def kafka_cluster(): finally: cluster.shutdown() +def view_test(expected_num_messages): + attempt = 0 + rows = 0 + while attempt < 500: + rows = int(instance.query("SELECT count() FROM view")) + if rows == expected_num_messages: + break + attempt += 1 -def test_bad_messages_parsing_stream(kafka_cluster): + assert rows == expected_num_messages + + +def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, check_method): admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) ) @@ -165,7 +176,7 @@ def test_bad_messages_parsing_stream(kafka_cluster): kafka_topic_list = '{format_name}_err', kafka_group_name = '{format_name}', kafka_format = '{format_name}', - kafka_handle_error_mode='stream'; + kafka_handle_error_mode= '{handle_error_mode}'; CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0 ; @@ -175,15 +186,7 @@ def test_bad_messages_parsing_stream(kafka_cluster): messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, f"{format_name}_err", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 - - assert rows == len(messages) + check_method(len(messages)) kafka_delete_topic(admin_client, f"{format_name}_err") @@ -210,7 +213,7 @@ message Message { kafka_topic_list = '{format_name}_err', kafka_group_name = '{format_name}', kafka_format = '{format_name}', - kafka_handle_error_mode='stream', + kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; CREATE MATERIALIZED VIEW view Engine=Log AS @@ -225,15 +228,7 @@ message Message { messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"] kafka_produce(kafka_cluster, f"{format_name}_err", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 - - assert rows == len(messages) + check_method(len(messages)) kafka_delete_topic(admin_client, f"{format_name}_err") @@ -259,7 +254,7 @@ struct Message kafka_topic_list = 'CapnProto_err', kafka_group_name = 'CapnProto', kafka_format = 'CapnProto', - kafka_handle_error_mode='stream', + kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; CREATE MATERIALIZED VIEW view Engine=Log AS @@ -274,18 +269,12 @@ struct Message messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, "CapnProto_err", messages) - attempt = 0 - rows = 0 - while attempt < 500: - rows = int(instance.query("SELECT count() FROM view")) - if rows == len(messages): - break - attempt += 1 - - assert rows == len(messages) + check_method(len(messages)) kafka_delete_topic(admin_client, "CapnProto_err") +def test_bad_messages_parsing_stream(kafka_cluster): + bad_messages_parsing_mode(kafka_cluster, 'stream', view_test) def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): admin_client = KafkaAdminClient( From 108e358f971641cc6c0b9a59d058f5d6b96aba81 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Thu, 22 Aug 2024 16:24:23 +0000 Subject: [PATCH 2/9] kafka_dead_letter_queue: test passed --- src/Interpreters/Context.cpp | 2 +- src/Interpreters/KafkaDeadLetterQueue.cpp | 58 ++++++++++++++++ src/Interpreters/KafkaDeadLetterQueue.h | 50 ++++++++++++++ src/Storages/Kafka/KafkaSource.cpp | 42 +++++++---- src/Storages/Kafka/StorageKafka2.cpp | 3 + .../test_kafka_bad_messages/test.py | 69 +++++++++++++------ 6 files changed, 190 insertions(+), 34 deletions(-) create mode 100644 src/Interpreters/KafkaDeadLetterQueue.cpp create mode 100644 src/Interpreters/KafkaDeadLetterQueue.h diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index fe262ba54aa..d0dbba361b5 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -4315,7 +4315,7 @@ std::shared_ptr Context::getBlobStorageLog() const std::shared_ptr Context::getDeadLetterQueue() const { SharedLockGuard lock(shared->mutex); - if (!shared->system_logs->dead_letter_queue) + if (!shared->system_logs) return {}; return shared->system_logs->dead_letter_queue; diff --git a/src/Interpreters/KafkaDeadLetterQueue.cpp b/src/Interpreters/KafkaDeadLetterQueue.cpp new file mode 100644 index 00000000000..eabd4e6d2c4 --- /dev/null +++ b/src/Interpreters/KafkaDeadLetterQueue.cpp @@ -0,0 +1,58 @@ +#include + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +ColumnsDescription DeadLetterQueueElement::getColumnsDescription() +{ + auto low_cardinality_string = std::make_shared(std::make_shared()); + + return ColumnsDescription + { + {"event_date", std::make_shared(), "Message consuming date."}, + {"event_time", std::make_shared(), "Message consuming time."}, + {"event_time_microseconds", std::make_shared(6), "Query starting time with microseconds precision."}, + {"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."}, + {"table_name", low_cardinality_string, "ClickHouse table name."}, + {"topic_name", low_cardinality_string, "Topic name."}, + {"partition", std::make_shared(), "Partition."}, + {"offset", std::make_shared(), "Offset."}, + {"raw_message", std::make_shared(), "Message body."}, + {"error", std::make_shared(), "Error text."} + }; +} + +void DeadLetterQueueElement::appendToBlock(MutableColumns & columns) const +{ + size_t i = 0; + + columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); + columns[i++]->insert(event_time); + columns[i++]->insert(event_time_microseconds); + + columns[i++]->insertData(database_name.data(), database_name.size()); + columns[i++]->insertData(table_name.data(), table_name.size()); + columns[i++]->insertData(topic_name.data(), topic_name.size()); + + columns[i++]->insert(partition); + columns[i++]->insert(offset); + + columns[i++]->insertData(raw_message.data(), raw_message.size()); + columns[i++]->insertData(error.data(), error.size()); + + +} + +NamesAndAliases DeadLetterQueueElement::getNamesAndAliases() +{ + return NamesAndAliases{}; +} +} diff --git a/src/Interpreters/KafkaDeadLetterQueue.h b/src/Interpreters/KafkaDeadLetterQueue.h new file mode 100644 index 00000000000..17ba4ef99c6 --- /dev/null +++ b/src/Interpreters/KafkaDeadLetterQueue.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include +#include +#include + + +/// should be called ...Log for uniformity + +// event_time, +// database, +// table, +// topic, +// partition, +// offset, +// raw_message, +// error + +namespace DB +{ + + +struct DeadLetterQueueElement +{ + UInt64 event_time{}; + Decimal64 event_time_microseconds{}; + + String database_name; + String table_name; + String topic_name; + Int64 partition; + Int64 offset; + String raw_message; + String error; + + static std::string name() { return "DeadLetterQueue"; } + + static ColumnsDescription getColumnsDescription(); + static NamesAndAliases getNamesAndAliases(); + void appendToBlock(MutableColumns & columns) const; +}; + +class DeadLetterQueue : public SystemLog +{ + using SystemLog::SystemLog; + + +}; + +} diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 4826ef97d13..92d4c5f976c 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -49,6 +49,7 @@ KafkaSource::KafkaSource( , virtual_header(storage.getVirtualsHeader()) , handle_error_mode(storage.getHandleErrorMode()) { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "ctor"); } KafkaSource::~KafkaSource() @@ -77,6 +78,7 @@ bool KafkaSource::checkTimeLimit() const Chunk KafkaSource::generateImpl() { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "actual top"); if (!consumer) { auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds()); @@ -93,6 +95,7 @@ Chunk KafkaSource::generateImpl() if (is_finished) return {}; + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top"); is_finished = true; // now it's one-time usage InputStream // one block of the needed size (or with desired flush timeout) is formed in one internal iteration @@ -146,6 +149,7 @@ Chunk KafkaSource::generateImpl() while (true) { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of while"); size_t new_rows = 0; exception_message.reset(); if (auto buf = consumer->consume()) @@ -165,6 +169,7 @@ Chunk KafkaSource::generateImpl() ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows); + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: new_rows"); consumer->storeLastReadMessageOffset(); auto topic = consumer->currentTopic(); @@ -190,6 +195,7 @@ Chunk KafkaSource::generateImpl() for (size_t i = 0; i < new_rows; ++i) { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of for"); virtual_columns[0]->insert(topic); virtual_columns[1]->insert(key); virtual_columns[2]->insert(offset); @@ -221,21 +227,31 @@ Chunk KafkaSource::generateImpl() virtual_columns[9]->insertDefault(); } } - else if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM && exception_message) + else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { - const auto time_now = std::chrono::system_clock::now(); - auto storage_id = storage.getStorageID(); + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); + if (exception_message) + { + + const auto time_now = std::chrono::system_clock::now(); + auto storage_id = storage.getStorageID(); + + auto dead_letter_queue = context->getDeadLetterQueue(); + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); + dead_letter_queue->add( + DeadLetterQueueElement{ + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = storage_id.database_name, + .table_name = storage_id.table_name, + .topic_name = consumer->currentTopic(), + .partition = consumer->currentPartition(), + .offset = consumer->currentPartition(), + .raw_message = consumer->currentPayload(), + .error = exception_message.value(), + }); + } - auto dead_letter_queue = context->getDeadLetterQueue(); - dead_letter_queue->add( - DeadLetterQueueElement{ - .event_time = timeInSeconds(time_now), - .event_time_microseconds = timeInMicroseconds(time_now), - .database_name = storage_id.database_name, - .table_name = storage_id.table_name, - .raw_message = consumer->currentPayload(), - .error = exception_message.value(), - }); } } diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 81e9c975af8..9e2e9bb3389 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -929,6 +929,9 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); + + LOG_DEBUG(&Poco::Logger::get("StorageKafka2"), "pollConsumer"); + if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 07264db0696..3a60b224d11 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -133,8 +133,28 @@ def view_test(expected_num_messages): assert rows == expected_num_messages +def dead_letter_queue_test(expected_num_messages): + # attempt = 0 + # rows = 0 + # while attempt < 500: + # rows = int(instance.query("SELECT count() FROM view")) + # if rows == expected_num_messages: + # break + # attempt += 1 -def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, check_method): + # assert rows == expected_num_messages + time.sleep(2) + + rows = instance.query("SELECT count() FROM view") + logging.debug(f"system.dead_letter_queue - views contains {rows} rows") + + instance.query("SYSTEM FLUSH LOGS") + + result = instance.query("SELECT * FROM system.dead_letter_queue") + logging.debug(f"system.dead_letter_queue contains {result}") + + +def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method): admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) ) @@ -162,8 +182,9 @@ def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, check_method): "MySQLDump", ]: print(format_name) + topic_name = f"{format_name}_{handle_error_mode}_err" - kafka_create_topic(admin_client, f"{format_name}_err") + kafka_create_topic(admin_client, f"{topic_name}") instance.query( f""" @@ -173,22 +194,21 @@ def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, check_method): CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}_err', + kafka_topic_list = '{topic_name}', kafka_group_name = '{format_name}', kafka_format = '{format_name}', kafka_handle_error_mode= '{handle_error_mode}'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0 ; + {additional_dml} """ ) messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] - kafka_produce(kafka_cluster, f"{format_name}_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) check_method(len(messages)) - kafka_delete_topic(admin_client, f"{format_name}_err") + kafka_delete_topic(admin_client, f"{topic_name}") protobuf_schema = """ syntax = "proto3"; @@ -202,6 +222,7 @@ message Message { instance.create_format_schema("schema_test_errors.proto", protobuf_schema) for format_name in ["Protobuf", "ProtobufSingle", "ProtobufList"]: + topic_name = f"{format_name}_{handle_error_mode}_err" instance.query( f""" DROP TABLE IF EXISTS view; @@ -210,27 +231,26 @@ message Message { CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{format_name}_err', + kafka_topic_list = '{topic_name}', kafka_group_name = '{format_name}', kafka_format = '{format_name}', kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0 ; + {additional_dml} """ ) print(format_name) - kafka_create_topic(admin_client, f"{format_name}_err") + kafka_create_topic(admin_client, f"{topic_name}") messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"] - kafka_produce(kafka_cluster, f"{format_name}_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) check_method(len(messages)) - kafka_delete_topic(admin_client, f"{format_name}_err") + kafka_delete_topic(admin_client, f"{topic_name}") capn_proto_schema = """ @0xd9dd7b35452d1c4f; @@ -243,6 +263,7 @@ struct Message """ instance.create_format_schema("schema_test_errors.capnp", capn_proto_schema) + topic_name = f"CapnProto_{handle_error_mode}_err" instance.query( f""" DROP TABLE IF EXISTS view; @@ -251,30 +272,38 @@ struct Message CREATE TABLE kafka (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'CapnProto_err', + kafka_topic_list = '{topic_name}', kafka_group_name = 'CapnProto', kafka_format = 'CapnProto', kafka_handle_error_mode= '{handle_error_mode}', kafka_schema='schema_test_errors:Message'; - CREATE MATERIALIZED VIEW view Engine=Log AS - SELECT _error FROM kafka WHERE length(_error) != 0; + {additional_dml} """ ) print("CapnProto") - kafka_create_topic(admin_client, "CapnProto_err") + kafka_create_topic(admin_client, f"{topic_name}") messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] - kafka_produce(kafka_cluster, "CapnProto_err", messages) + kafka_produce(kafka_cluster, f"{topic_name}", messages) check_method(len(messages)) - kafka_delete_topic(admin_client, "CapnProto_err") + kafka_delete_topic(admin_client, f"{topic_name}") def test_bad_messages_parsing_stream(kafka_cluster): - bad_messages_parsing_mode(kafka_cluster, 'stream', view_test) + bad_messages_parsing_mode(kafka_cluster, + 'stream', + 'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0', + view_test) + +def test_bad_messages_parsing_dead_letter_queue(kafka_cluster): + bad_messages_parsing_mode(kafka_cluster, + 'dead_letter_queue', + 'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka', + dead_letter_queue_test) def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): admin_client = KafkaAdminClient( From be842c9ff9ad146925baa09605cee94d6e13995e Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Thu, 22 Aug 2024 20:28:47 +0000 Subject: [PATCH 3/9] kafka_dead_letter_queue: stream type --- src/Interpreters/KafkaDeadLetterQueue.cpp | 8 ++++++++ src/Interpreters/KafkaDeadLetterQueue.h | 6 ++++++ src/Storages/Kafka/KafkaSource.cpp | 1 + .../integration/test_kafka_bad_messages/test.py | 17 ++--------------- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/Interpreters/KafkaDeadLetterQueue.cpp b/src/Interpreters/KafkaDeadLetterQueue.cpp index eabd4e6d2c4..731e8992cf6 100644 --- a/src/Interpreters/KafkaDeadLetterQueue.cpp +++ b/src/Interpreters/KafkaDeadLetterQueue.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -15,8 +16,14 @@ ColumnsDescription DeadLetterQueueElement::getColumnsDescription() { auto low_cardinality_string = std::make_shared(std::make_shared()); + auto stream_type = std::make_shared( + DataTypeEnum8::Values{ + {"Kafka", static_cast(StreamType::Kafka)}, + }); + return ColumnsDescription { + {"stream_type", stream_type, "Stream type. Possible values: 'Kafka'."}, {"event_date", std::make_shared(), "Message consuming date."}, {"event_time", std::make_shared(), "Message consuming time."}, {"event_time_microseconds", std::make_shared(6), "Query starting time with microseconds precision."}, @@ -34,6 +41,7 @@ void DeadLetterQueueElement::appendToBlock(MutableColumns & columns) const { size_t i = 0; + columns[i++]->insert(static_cast(stream_type)); columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType()); columns[i++]->insert(event_time); columns[i++]->insert(event_time_microseconds); diff --git a/src/Interpreters/KafkaDeadLetterQueue.h b/src/Interpreters/KafkaDeadLetterQueue.h index 17ba4ef99c6..01e9292270f 100644 --- a/src/Interpreters/KafkaDeadLetterQueue.h +++ b/src/Interpreters/KafkaDeadLetterQueue.h @@ -22,6 +22,12 @@ namespace DB struct DeadLetterQueueElement { + enum class StreamType : int8_t + { + Kafka = 1, + }; + + StreamType stream_type; UInt64 event_time{}; Decimal64 event_time_microseconds{}; diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 92d4c5f976c..62e45038b73 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -240,6 +240,7 @@ Chunk KafkaSource::generateImpl() LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); dead_letter_queue->add( DeadLetterQueueElement{ + .stream_type = DeadLetterQueueElement::StreamType::Kafka, .event_time = timeInSeconds(time_now), .event_time_microseconds = timeInMicroseconds(time_now), .database_name = storage_id.database_name, diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 3a60b224d11..4d354201b0f 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -126,6 +126,7 @@ def view_test(expected_num_messages): attempt = 0 rows = 0 while attempt < 500: + time.sleep(0.1) rows = int(instance.query("SELECT count() FROM view")) if rows == expected_num_messages: break @@ -134,21 +135,7 @@ def view_test(expected_num_messages): assert rows == expected_num_messages def dead_letter_queue_test(expected_num_messages): - # attempt = 0 - # rows = 0 - # while attempt < 500: - # rows = int(instance.query("SELECT count() FROM view")) - # if rows == expected_num_messages: - # break - # attempt += 1 - - # assert rows == expected_num_messages - time.sleep(2) - - rows = instance.query("SELECT count() FROM view") - logging.debug(f"system.dead_letter_queue - views contains {rows} rows") - - instance.query("SYSTEM FLUSH LOGS") + view_test(expected_num_messages) result = instance.query("SELECT * FROM system.dead_letter_queue") logging.debug(f"system.dead_letter_queue contains {result}") From 9b96ad1d5efb8143bdf2de06de1aab4b582f76a4 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 23 Aug 2024 21:54:34 +0000 Subject: [PATCH 4/9] kafka_dead_letter_queue: rename, small cleanup --- src/Common/SystemLogBase.cpp | 2 +- ...kaDeadLetterQueue.cpp => DeadLetterQueue.cpp} | 2 +- ...{KafkaDeadLetterQueue.h => DeadLetterQueue.h} | 0 src/Interpreters/SystemLog.cpp | 2 +- src/Storages/Kafka/KafkaSource.cpp | 2 +- .../integration/test_kafka_bad_messages/test.py | 16 ++++++++++------ 6 files changed, 14 insertions(+), 10 deletions(-) rename src/Interpreters/{KafkaDeadLetterQueue.cpp => DeadLetterQueue.cpp} (98%) rename src/Interpreters/{KafkaDeadLetterQueue.h => DeadLetterQueue.h} (100%) diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index e09a061cadd..8729649d9b4 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Interpreters/KafkaDeadLetterQueue.cpp b/src/Interpreters/DeadLetterQueue.cpp similarity index 98% rename from src/Interpreters/KafkaDeadLetterQueue.cpp rename to src/Interpreters/DeadLetterQueue.cpp index 731e8992cf6..f41127bc0bf 100644 --- a/src/Interpreters/KafkaDeadLetterQueue.cpp +++ b/src/Interpreters/DeadLetterQueue.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include diff --git a/src/Interpreters/KafkaDeadLetterQueue.h b/src/Interpreters/DeadLetterQueue.h similarity index 100% rename from src/Interpreters/KafkaDeadLetterQueue.h rename to src/Interpreters/DeadLetterQueue.h diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index 384e96fbfd7..5a6c325ed4d 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 62e45038b73..69696a04d35 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 4d354201b0f..5621e9ef753 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -122,7 +122,7 @@ def kafka_cluster(): finally: cluster.shutdown() -def view_test(expected_num_messages): +def view_test(expected_num_messages, *_): attempt = 0 rows = 0 while attempt < 500: @@ -134,12 +134,16 @@ def view_test(expected_num_messages): assert rows == expected_num_messages -def dead_letter_queue_test(expected_num_messages): +def dead_letter_queue_test(expected_num_messages, topic_name): view_test(expected_num_messages) + instance.query("SYSTEM FLUSH LOGS") - result = instance.query("SELECT * FROM system.dead_letter_queue") + result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'") logging.debug(f"system.dead_letter_queue contains {result}") + rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")) + assert rows == expected_num_messages + def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method): admin_client = KafkaAdminClient( @@ -193,7 +197,7 @@ def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}") @@ -235,7 +239,7 @@ message Message { messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}") @@ -276,7 +280,7 @@ struct Message messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"] kafka_produce(kafka_cluster, f"{topic_name}", messages) - check_method(len(messages)) + check_method(len(messages), topic_name) kafka_delete_topic(admin_client, f"{topic_name}") From ff49aef8ac5478fb7a7406c17007489e8614e00c Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 23 Aug 2024 23:23:02 +0000 Subject: [PATCH 5/9] kafka_dead_letter_queue: StorageKafka2 covered --- src/Storages/Kafka/StorageKafka2.cpp | 33 +++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 9e2e9bb3389..e918158d7c4 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -823,6 +824,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( switch (getHandleKafkaErrorMode()) { case ExtStreamingHandleErrorMode::STREAM: + case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: { exception_message = e.message(); for (const auto & column : result_columns) @@ -847,11 +849,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( consumer.currentOffset()); throw std::move(e); } - case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE: - { - LOG_DEBUG(log, "Not implemented."); - break; - } } return 1; }; @@ -945,6 +942,32 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( virtual_columns[9]->insertDefault(); } } + else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) + { + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); + if (exception_message) + { + + const auto time_now = std::chrono::system_clock::now(); + + auto dead_letter_queue = getContext()->getDeadLetterQueue(); + LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); + dead_letter_queue->add( + DeadLetterQueueElement{ + .stream_type = DeadLetterQueueElement::StreamType::Kafka, + .event_time = timeInSeconds(time_now), + .event_time_microseconds = timeInMicroseconds(time_now), + .database_name = getStorageID().database_name, + .table_name = getStorageID().table_name, + .topic_name = consumer.currentTopic(), + .partition = consumer.currentPartition(), + .offset = consumer.currentPartition(), + .raw_message = consumer.currentPayload(), + .error = exception_message.value(), + }); + } + + } } total_rows = total_rows + new_rows; From 7fc729321474d0d5f76efc6fa45b0e29228ab3af Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Mon, 26 Aug 2024 08:15:33 +0000 Subject: [PATCH 6/9] kafka_dead_letter_queue: docs --- docs/en/engines/table-engines/integrations/kafka.md | 2 +- docs/ru/engines/table-engines/integrations/kafka.md | 2 +- src/Interpreters/DeadLetterQueue.cpp | 2 +- .../test_kafka_bad_messages/configs/dead_letter_queue.xml | 8 ++++++++ tests/integration/test_kafka_bad_messages/test.py | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index de6492e8ea7..687e9e19f8a 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -62,7 +62,7 @@ Optional parameters: - `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size). - `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms). - `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`. -- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). +- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`), dead_letter_queue (error related data will be saved in system.dead_letter_queue) . - `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`. - `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`. diff --git a/docs/ru/engines/table-engines/integrations/kafka.md b/docs/ru/engines/table-engines/integrations/kafka.md index fb62f30ef9a..e09bde6a4a4 100644 --- a/docs/ru/engines/table-engines/integrations/kafka.md +++ b/docs/ru/engines/table-engines/integrations/kafka.md @@ -64,7 +64,7 @@ SETTINGS - `kafka_poll_max_batch_size` - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size) - `kafka_flush_interval_ms` - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms) - `kafka_thread_per_consumer` — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию `0`). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок. -- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream. +- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream, dead_letter_queue. - `kafka_commit_on_select` - Сообщение о commit при запросе select. По умолчанию: `false`. - `kafka_max_rows_per_message` - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: `1`. diff --git a/src/Interpreters/DeadLetterQueue.cpp b/src/Interpreters/DeadLetterQueue.cpp index f41127bc0bf..ff25538a888 100644 --- a/src/Interpreters/DeadLetterQueue.cpp +++ b/src/Interpreters/DeadLetterQueue.cpp @@ -25,7 +25,7 @@ ColumnsDescription DeadLetterQueueElement::getColumnsDescription() { {"stream_type", stream_type, "Stream type. Possible values: 'Kafka'."}, {"event_date", std::make_shared(), "Message consuming date."}, - {"event_time", std::make_shared(), "Message consuming time."}, + {"event_time", std::make_shared(), "Message consuming date and time."}, {"event_time_microseconds", std::make_shared(6), "Query starting time with microseconds precision."}, {"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."}, {"table_name", low_cardinality_string, "ClickHouse table name."}, diff --git a/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml b/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml new file mode 100644 index 00000000000..cdd1b807173 --- /dev/null +++ b/tests/integration/test_kafka_bad_messages/configs/dead_letter_queue.xml @@ -0,0 +1,8 @@ + + + system + dead_letter_queue
+ toYYYYMM(event_date) + 1000 +
+
diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 5621e9ef753..81085896229 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -138,7 +138,7 @@ def dead_letter_queue_test(expected_num_messages, topic_name): view_test(expected_num_messages) instance.query("SYSTEM FLUSH LOGS") - result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'") + result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical") logging.debug(f"system.dead_letter_queue contains {result}") rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")) From e411d7f50fdf8920d25e95daaab65400451165e6 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Mon, 26 Aug 2024 13:55:18 +0000 Subject: [PATCH 7/9] kafka_dead_letter_queue: debug logging removed --- src/Storages/Kafka/KafkaSource.cpp | 8 -------- src/Storages/Kafka/StorageKafka2.cpp | 4 ---- src/Storages/Kafka/StorageKafkaUtils.cpp | 11 ----------- 3 files changed, 23 deletions(-) diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 69696a04d35..071f901bb5e 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -49,7 +49,6 @@ KafkaSource::KafkaSource( , virtual_header(storage.getVirtualsHeader()) , handle_error_mode(storage.getHandleErrorMode()) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "ctor"); } KafkaSource::~KafkaSource() @@ -78,7 +77,6 @@ bool KafkaSource::checkTimeLimit() const Chunk KafkaSource::generateImpl() { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "actual top"); if (!consumer) { auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds()); @@ -95,7 +93,6 @@ Chunk KafkaSource::generateImpl() if (is_finished) return {}; - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top"); is_finished = true; // now it's one-time usage InputStream // one block of the needed size (or with desired flush timeout) is formed in one internal iteration @@ -149,7 +146,6 @@ Chunk KafkaSource::generateImpl() while (true) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of while"); size_t new_rows = 0; exception_message.reset(); if (auto buf = consumer->consume()) @@ -169,7 +165,6 @@ Chunk KafkaSource::generateImpl() ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: new_rows"); consumer->storeLastReadMessageOffset(); auto topic = consumer->currentTopic(); @@ -195,7 +190,6 @@ Chunk KafkaSource::generateImpl() for (size_t i = 0; i < new_rows; ++i) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of for"); virtual_columns[0]->insert(topic); virtual_columns[1]->insert(key); virtual_columns[2]->insert(offset); @@ -229,7 +223,6 @@ Chunk KafkaSource::generateImpl() } else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); if (exception_message) { @@ -237,7 +230,6 @@ Chunk KafkaSource::generateImpl() auto storage_id = storage.getStorageID(); auto dead_letter_queue = context->getDeadLetterQueue(); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); dead_letter_queue->add( DeadLetterQueueElement{ .stream_type = DeadLetterQueueElement::StreamType::Kafka, diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index e918158d7c4..cf3e1ac65d7 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -927,8 +927,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); - LOG_DEBUG(&Poco::Logger::get("StorageKafka2"), "pollConsumer"); - if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM) { if (exception_message) @@ -944,14 +942,12 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( } else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE) { - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE"); if (exception_message) { const auto time_now = std::chrono::system_clock::now(); auto dead_letter_queue = getContext()->getDeadLetterQueue(); - LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add"); dead_letter_queue->add( DeadLetterQueueElement{ .stream_type = DeadLetterQueueElement::StreamType::Kafka, diff --git a/src/Storages/Kafka/StorageKafkaUtils.cpp b/src/Storages/Kafka/StorageKafkaUtils.cpp index 27b59841c29..327d812e843 100644 --- a/src/Storages/Kafka/StorageKafkaUtils.cpp +++ b/src/Storages/Kafka/StorageKafkaUtils.cpp @@ -59,8 +59,6 @@ namespace ErrorCodes void registerStorageKafka(StorageFactory & factory) { - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "Top of registerStorageKafka"); - auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr { ASTs & engine_args = args.engine_args; @@ -74,9 +72,6 @@ void registerStorageKafka(StorageFactory & factory) for (const auto & setting : kafka_settings->all()) { const auto & setting_name = setting.getName(); - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka (named collection): processing {}", setting_name); - - if (named_collection->has(setting_name)) kafka_settings->set(setting_name, named_collection->get(setting_name)); } @@ -85,9 +80,7 @@ void registerStorageKafka(StorageFactory & factory) if (has_settings) { - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before loadFromQuery"); kafka_settings->loadFromQuery(*args.storage_def); - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after loadFromQuery"); } // Check arguments and settings @@ -161,9 +154,7 @@ void registerStorageKafka(StorageFactory & factory) CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0) CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0) - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0) - LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT"); CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0) CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0) } @@ -291,8 +282,6 @@ void registerStorageKafka(StorageFactory & factory) args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name); }; - - factory.registerStorage( "Kafka", creator_fn, From d598278b898f0389c6ffe32a2aaff329e076d3ac Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 28 Aug 2024 09:49:02 +0000 Subject: [PATCH 8/9] kafka_dead_letter_queue: black formatter --- .../test_kafka_bad_messages/test.py | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 81085896229..694bf4774e6 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -122,6 +122,7 @@ def kafka_cluster(): finally: cluster.shutdown() + def view_test(expected_num_messages, *_): attempt = 0 rows = 0 @@ -134,18 +135,27 @@ def view_test(expected_num_messages, *_): assert rows == expected_num_messages + def dead_letter_queue_test(expected_num_messages, topic_name): view_test(expected_num_messages) instance.query("SYSTEM FLUSH LOGS") - result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical") + result = instance.query( + f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical" + ) logging.debug(f"system.dead_letter_queue contains {result}") - rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")) + rows = int( + instance.query( + f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'" + ) + ) assert rows == expected_num_messages -def bad_messages_parsing_mode(kafka_cluster, handle_error_mode, additional_dml, check_method): +def bad_messages_parsing_mode( + kafka_cluster, handle_error_mode, additional_dml, check_method +): admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) ) @@ -284,17 +294,24 @@ struct Message kafka_delete_topic(admin_client, f"{topic_name}") + def test_bad_messages_parsing_stream(kafka_cluster): - bad_messages_parsing_mode(kafka_cluster, - 'stream', - 'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0', - view_test) + bad_messages_parsing_mode( + kafka_cluster, + "stream", + "CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0", + view_test, + ) + def test_bad_messages_parsing_dead_letter_queue(kafka_cluster): - bad_messages_parsing_mode(kafka_cluster, - 'dead_letter_queue', - 'CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka', - dead_letter_queue_test) + bad_messages_parsing_mode( + kafka_cluster, + "dead_letter_queue", + "CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka", + dead_letter_queue_test, + ) + def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): admin_client = KafkaAdminClient( From f39a5e19261ccd3315eae9cd6a6d508b7099d16f Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 28 Aug 2024 11:44:59 +0000 Subject: [PATCH 9/9] kafka_dead_letter_queue: Nullable, forgotten doc added --- .../system-tables/dead_letter_queue.md | 87 +++++++++++++++++++ src/Interpreters/DeadLetterQueue.cpp | 8 +- 2 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 docs/en/operations/system-tables/dead_letter_queue.md diff --git a/docs/en/operations/system-tables/dead_letter_queue.md b/docs/en/operations/system-tables/dead_letter_queue.md new file mode 100644 index 00000000000..47b7b971b75 --- /dev/null +++ b/docs/en/operations/system-tables/dead_letter_queue.md @@ -0,0 +1,87 @@ +--- +slug: /en/operations/system-tables/dead_letter_queue +--- +# dead_letter_queue + +Contains information about messages received via a stream engine and parsed with an errors. Currently implemented for Kafka. + +Logging is controlled by `dead_letter_queue` of `kafka_handle_error_mode` setting. + +The flushing period of data is set in `flush_interval_milliseconds` parameter of the [dead_letter_queue](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-dead_letter_queue) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query. + +ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details. + +Columns: + +- `stream_type` ([Enum8](../../sql-reference/data-types/enum.md)) - Stream type. Possible values: 'Kafka'. +- `event_date` ([Date](../../sql-reference/data-types/date.md)) - Message consuming date. +- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) - Message consuming date and time. +- `event_time_microseconds`([DateTime64](../../sql-reference/data-types/datetime64.md)) - Message consuming time with microseconds precision. +- `database_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse database Kafka table belongs to. +- `table_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse table name. +- `topic_name` ([Nullable(String)](../../sql-reference/data-types/nullable.md)) - Topic name. +- `partition` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Partition. +- `offset` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Offset. +- `raw_message` ([String](../../sql-reference/data-types/string.md)) - Message body. +- `error` ([String](../../sql-reference/data-types/string.md)) - Error text. + +**Example** + +Query: + +``` sql +SELECT * FROM system.asynchronous_insert_log LIMIT 1 \G; +``` + +Result: + +``` text +Row 1: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268091 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: qwertyuiop +error: Message has too many segments. Most likely, data was corrupted: (at row 1) + + +Row 2: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268361 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: asdfghjkl +error: Message has too many segments. Most likely, data was corrupted: (at row 1) + + +Row 3: +────── +stream_type: Kafka +event_date: 2024-08-26 +event_time: 2024-08-26 07:49:20 +event_time_microseconds: 2024-08-26 07:49:20.268604 +database_name: default +table_name: kafka +topic_name: CapnProto_dead_letter_queue_err +partition: 0 +offset: 0 +raw_message: zxcvbnm +error: Message has too many segments. Most likely, data was corrupted: (at row 1) +``` + +**See Also** + +- [Kafka](../../engines/table-engines/integrations/kafka) - Kafka Engine +- [system.kafka_consumers](../../operations/system-tables/kafka_consumers.md#system_tables-kafka_consumers) — Description of the `kafka_consumers` system table which contains information like statistics and errors about Kafka consumers. diff --git a/src/Interpreters/DeadLetterQueue.cpp b/src/Interpreters/DeadLetterQueue.cpp index ff25538a888..39387ad795e 100644 --- a/src/Interpreters/DeadLetterQueue.cpp +++ b/src/Interpreters/DeadLetterQueue.cpp @@ -7,7 +7,7 @@ #include #include #include - +#include namespace DB { @@ -29,9 +29,9 @@ ColumnsDescription DeadLetterQueueElement::getColumnsDescription() {"event_time_microseconds", std::make_shared(6), "Query starting time with microseconds precision."}, {"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."}, {"table_name", low_cardinality_string, "ClickHouse table name."}, - {"topic_name", low_cardinality_string, "Topic name."}, - {"partition", std::make_shared(), "Partition."}, - {"offset", std::make_shared(), "Offset."}, + {"topic_name", std::make_shared(std::make_shared()), "Topic name."}, + {"partition", std::make_shared(std::make_shared()), "Partition."}, + {"offset", std::make_shared(std::make_shared()), "Offset."}, {"raw_message", std::make_shared(), "Message body."}, {"error", std::make_shared(), "Error text."} };