This commit is contained in:
Ilya Golshtein 2024-09-18 23:24:07 +03:00 committed by GitHub
commit 60a09b772d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 444 additions and 114 deletions

View File

@ -62,7 +62,7 @@ Optional parameters:
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`).
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`), dead_letter_queue (error related data will be saved in system.dead_letter_queue) .
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.

View File

@ -0,0 +1,87 @@
---
slug: /en/operations/system-tables/dead_letter_queue
---
# dead_letter_queue
Contains information about messages received via a stream engine and parsed with an errors. Currently implemented for Kafka.
Logging is controlled by `dead_letter_queue` of `kafka_handle_error_mode` setting.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [dead_letter_queue](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-dead_letter_queue) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.
ClickHouse does not delete data from the table automatically. See [Introduction](../../operations/system-tables/index.md#system-tables-introduction) for more details.
Columns:
- `stream_type` ([Enum8](../../sql-reference/data-types/enum.md)) - Stream type. Possible values: 'Kafka'.
- `event_date` ([Date](../../sql-reference/data-types/date.md)) - Message consuming date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) - Message consuming date and time.
- `event_time_microseconds`([DateTime64](../../sql-reference/data-types/datetime64.md)) - Message consuming time with microseconds precision.
- `database_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse database Kafka table belongs to.
- `table_name` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) - ClickHouse table name.
- `topic_name` ([Nullable(String)](../../sql-reference/data-types/nullable.md)) - Topic name.
- `partition` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Partition.
- `offset` ([Nullable(UInt64)](../../sql-reference/data-types/nullable.md)) - Offset.
- `raw_message` ([String](../../sql-reference/data-types/string.md)) - Message body.
- `error` ([String](../../sql-reference/data-types/string.md)) - Error text.
**Example**
Query:
``` sql
SELECT * FROM system.asynchronous_insert_log LIMIT 1 \G;
```
Result:
``` text
Row 1:
──────
stream_type: Kafka
event_date: 2024-08-26
event_time: 2024-08-26 07:49:20
event_time_microseconds: 2024-08-26 07:49:20.268091
database_name: default
table_name: kafka
topic_name: CapnProto_dead_letter_queue_err
partition: 0
offset: 0
raw_message: qwertyuiop
error: Message has too many segments. Most likely, data was corrupted: (at row 1)
Row 2:
──────
stream_type: Kafka
event_date: 2024-08-26
event_time: 2024-08-26 07:49:20
event_time_microseconds: 2024-08-26 07:49:20.268361
database_name: default
table_name: kafka
topic_name: CapnProto_dead_letter_queue_err
partition: 0
offset: 0
raw_message: asdfghjkl
error: Message has too many segments. Most likely, data was corrupted: (at row 1)
Row 3:
──────
stream_type: Kafka
event_date: 2024-08-26
event_time: 2024-08-26 07:49:20
event_time_microseconds: 2024-08-26 07:49:20.268604
database_name: default
table_name: kafka
topic_name: CapnProto_dead_letter_queue_err
partition: 0
offset: 0
raw_message: zxcvbnm
error: Message has too many segments. Most likely, data was corrupted: (at row 1)
```
**See Also**
- [Kafka](../../engines/table-engines/integrations/kafka) - Kafka Engine
- [system.kafka_consumers](../../operations/system-tables/kafka_consumers.md#system_tables-kafka_consumers) — Description of the `kafka_consumers` system table which contains information like statistics and errors about Kafka consumers.

View File

@ -64,7 +64,7 @@ SETTINGS
- `kafka_poll_max_batch_size` - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)
- `kafka_flush_interval_ms` - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)
- `kafka_thread_per_consumer` — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию `0`). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.
- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream.
- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream, dead_letter_queue.
- `kafka_commit_on_select` - Сообщение о commit при запросе select. По умолчанию: `false`.
- `kafka_max_rows_per_message` - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: `1`.

View File

@ -18,6 +18,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <Interpreters/DeadLetterQueue.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>

View File

@ -32,6 +32,7 @@
M(AsynchronousInsertLogElement) \
M(BackupLogElement) \
M(BlobStorageLogElement) \
M(DeadLetterQueueElement) \
M(ErrorLogElement)
namespace Poco

View File

@ -141,6 +141,11 @@ IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", StreamingHandleErrorMode::DEFAULT},
{"stream", StreamingHandleErrorMode::STREAM}})
IMPLEMENT_SETTING_ENUM(ExtStreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", ExtStreamingHandleErrorMode::DEFAULT},
{"stream", ExtStreamingHandleErrorMode::STREAM},
{"dead_letter_queue", ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE}})
IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS,
{{"enable", ShortCircuitFunctionEvaluation::ENABLE},
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},

View File

@ -262,11 +262,20 @@ enum class StreamingHandleErrorMode : uint8_t
{
DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
};
DECLARE_SETTING_ENUM(StreamingHandleErrorMode)
enum class ExtStreamingHandleErrorMode : uint8_t
{
DEFAULT = 0, // Ignore errors with threshold.
STREAM, // Put errors to stream in the virtual column named ``_error.
DEAD_LETTER_QUEUE
/*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likely system.kafka_errors. This is not implemented now. */
/*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */
};
DECLARE_SETTING_ENUM(StreamingHandleErrorMode)
DECLARE_SETTING_ENUM(ExtStreamingHandleErrorMode)
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)

View File

@ -4174,7 +4174,6 @@ std::shared_ptr<PartLog> Context::getPartLog(const String & part_database) const
return shared->system_logs->part_log;
}
std::shared_ptr<TraceLog> Context::getTraceLog() const
{
SharedLockGuard lock(shared->mutex);
@ -4344,6 +4343,15 @@ std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
return shared->system_logs->blob_storage_log;
}
std::shared_ptr<DeadLetterQueue> Context::getDeadLetterQueue() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->dead_letter_queue;
}
SystemLogs Context::getSystemLogs() const
{
SharedLockGuard lock(shared->mutex);

View File

@ -114,6 +114,7 @@ class ObjectStorageQueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
class DeadLetterQueue;
class IAsynchronousReader;
class IOUringReader;
struct MergeTreeSettings;
@ -1160,6 +1161,7 @@ public:
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::shared_ptr<DeadLetterQueue> getDeadLetterQueue() const;
SystemLogs getSystemLogs() const;

View File

@ -0,0 +1,66 @@
#include <Interpreters/DeadLetterQueue.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
ColumnsDescription DeadLetterQueueElement::getColumnsDescription()
{
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
auto stream_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values{
{"Kafka", static_cast<Int8>(StreamType::Kafka)},
});
return ColumnsDescription
{
{"stream_type", stream_type, "Stream type. Possible values: 'Kafka'."},
{"event_date", std::make_shared<DataTypeDate>(), "Message consuming date."},
{"event_time", std::make_shared<DataTypeDateTime>(), "Message consuming date and time."},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6), "Query starting time with microseconds precision."},
{"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."},
{"table_name", low_cardinality_string, "ClickHouse table name."},
{"topic_name", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "Topic name."},
{"partition", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "Partition."},
{"offset", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "Offset."},
{"raw_message", std::make_shared<DataTypeString>(), "Message body."},
{"error", std::make_shared<DataTypeString>(), "Error text."}
};
}
void DeadLetterQueueElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(static_cast<Int8>(stream_type));
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
columns[i++]->insertData(database_name.data(), database_name.size());
columns[i++]->insertData(table_name.data(), table_name.size());
columns[i++]->insertData(topic_name.data(), topic_name.size());
columns[i++]->insert(partition);
columns[i++]->insert(offset);
columns[i++]->insertData(raw_message.data(), raw_message.size());
columns[i++]->insertData(error.data(), error.size());
}
NamesAndAliases DeadLetterQueueElement::getNamesAndAliases()
{
return NamesAndAliases{};
}
}

View File

@ -0,0 +1,56 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Storages/ColumnsDescription.h>
/// should be called ...Log for uniformity
// event_time,
// database,
// table,
// topic,
// partition,
// offset,
// raw_message,
// error
namespace DB
{
struct DeadLetterQueueElement
{
enum class StreamType : int8_t
{
Kafka = 1,
};
StreamType stream_type;
UInt64 event_time{};
Decimal64 event_time_microseconds{};
String database_name;
String table_name;
String topic_name;
Int64 partition;
Int64 offset;
String raw_message;
String error;
static std::string name() { return "DeadLetterQueue"; }
static ColumnsDescription getColumnsDescription();
static NamesAndAliases getNamesAndAliases();
void appendToBlock(MutableColumns & columns) const;
};
class DeadLetterQueue : public SystemLog<DeadLetterQueueElement>
{
using SystemLog<DeadLetterQueueElement>::SystemLog;
};
}

View File

@ -28,6 +28,7 @@
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>

View File

@ -30,6 +30,7 @@
M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \
M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \
M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \
M(DeadLetterQueue, dead_letter_queue, "Contains messages that came from a streaming engine (e.g. Kafka) and were parsed unsuccessfully.") \
namespace DB

View File

@ -35,7 +35,7 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
/* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
M(ExtStreamingHandleErrorMode, kafka_handle_error_mode, ExtStreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after kafka_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error), dead_letter_queue", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \
M(String, kafka_keeper_path, "", "The path to the table in ClickHouse Keeper", 0) \

View File

@ -6,6 +6,7 @@
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Common/ProfileEvents.h>
@ -46,7 +47,7 @@ KafkaSource::KafkaSource(
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage.getVirtualsHeader())
, handle_error_mode(storage.getStreamingHandleErrorMode())
, handle_error_mode(storage.getHandleErrorMode())
{
}
@ -98,8 +99,6 @@ Chunk KafkaSource::generateImpl()
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM;
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
@ -108,34 +107,39 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&, this](const MutableColumns & result_columns, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
switch (handle_error_mode)
{
exception_message = e.message();
for (const auto & column : result_columns)
case ExtStreamingHandleErrorMode::STREAM:
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
exception_message = e.message();
for (const auto & column : result_columns)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// all data columns will get default value in case of error
column->insertDefault();
// all data columns will get default value in case of error
column->insertDefault();
}
break;
}
case ExtStreamingHandleErrorMode::DEFAULT:
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e);
}
}
return 1;
return 1;
}
else
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e);
}
};
StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error));
@ -203,7 +207,7 @@ Chunk KafkaSource::generateImpl()
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
if (put_error_to_stream)
if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
@ -217,6 +221,31 @@ Chunk KafkaSource::generateImpl()
virtual_columns[9]->insertDefault();
}
}
else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
if (exception_message)
{
const auto time_now = std::chrono::system_clock::now();
auto storage_id = storage.getStorageID();
auto dead_letter_queue = context->getDeadLetterQueue();
dead_letter_queue->add(
DeadLetterQueueElement{
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
.event_time = timeInSeconds(time_now),
.event_time_microseconds = timeInMicroseconds(time_now),
.database_name = storage_id.database_name,
.table_name = storage_id.table_name,
.topic_name = consumer->currentTopic(),
.partition = consumer->currentPartition(),
.offset = consumer->currentPartition(),
.raw_message = consumer->currentPayload(),
.error = exception_message.value(),
});
}
}
}
total_rows = total_rows + new_rows;
@ -232,7 +261,7 @@ Chunk KafkaSource::generateImpl()
else
{
// We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal
// TODO: it seems like in case of put_error_to_stream=true we may need to process those differently
// TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently
// currently we just skip them with note in logs.
consumer->storeLastReadMessageOffset();
LOG_DEBUG(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());

View File

@ -51,7 +51,7 @@ private:
const Block non_virtual_header;
const Block virtual_header;
const StreamingHandleErrorMode handle_error_mode;
const ExtStreamingHandleErrorMode handle_error_mode;
Poco::Timespan max_execution_time = 0;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};

View File

@ -165,7 +165,8 @@ StorageKafka::StorageKafka(
{
kafka_settings->sanityCheck();
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
if (kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::STREAM ||
kafka_settings->kafka_handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;

View File

@ -80,7 +80,7 @@ public:
const auto & getFormatName() const { return format_name; }
StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
ExtStreamingHandleErrorMode getHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
struct SafeConsumers
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/DeadLetterQueue.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
@ -125,7 +126,8 @@ StorageKafka2::StorageKafka2(
if (kafka_settings->kafka_num_consumers > 1 && !thread_per_consumer)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "With multiple consumers, it is required to use `kafka_thread_per_consumer` setting");
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM ||
getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;
@ -134,7 +136,7 @@ StorageKafka2::StorageKafka2(
storage_metadata.setColumns(columns_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(StorageKafkaUtils::createVirtuals(kafka_settings->kafka_handle_error_mode));
setVirtuals(StorageKafkaUtils::createVirtuals(getHandleKafkaErrorMode()));
auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
@ -807,8 +809,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
// otherwise external iteration will reuse that and logic will became even more fuzzy
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto put_error_to_stream = kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM;
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInput(
getFormatName(), empty_buf, non_virtual_header, modified_context, getMaxBlockSize(), std::nullopt, 1);
@ -817,36 +817,40 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&, this](const MutableColumns & result_columns, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
switch (getHandleKafkaErrorMode())
{
exception_message = e.message();
for (const auto & column : result_columns)
case ExtStreamingHandleErrorMode::STREAM:
case ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE:
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
exception_message = e.message();
for (const auto & column : result_columns)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// all data columns will get default value in case of error
column->insertDefault();
// all data columns will get default value in case of error
column->insertDefault();
}
break;
}
case ExtStreamingHandleErrorMode::DEFAULT:
{
e.addMessage(
"while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer.currentTopic(),
consumer.currentPartition(),
consumer.currentOffset());
throw std::move(e);
}
return 1;
}
else
{
e.addMessage(
"while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer.currentTopic(),
consumer.currentPartition(),
consumer.currentOffset());
throw std::move(e);
}
return 1;
};
StreamingFormatExecutor executor(non_virtual_header, input_format, std::move(on_error));
@ -922,7 +926,8 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
}
virtual_columns[6]->insert(headers_names);
virtual_columns[7]->insert(headers_values);
if (put_error_to_stream)
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM)
{
if (exception_message)
{
@ -935,6 +940,30 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
virtual_columns[9]->insertDefault();
}
}
else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
{
if (exception_message)
{
const auto time_now = std::chrono::system_clock::now();
auto dead_letter_queue = getContext()->getDeadLetterQueue();
dead_letter_queue->add(
DeadLetterQueueElement{
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
.event_time = timeInSeconds(time_now),
.event_time_microseconds = timeInMicroseconds(time_now),
.database_name = getStorageID().database_name,
.table_name = getStorageID().table_name,
.topic_name = consumer.currentTopic(),
.partition = consumer.currentPartition(),
.offset = consumer.currentPartition(),
.raw_message = consumer.currentPayload(),
.error = exception_message.value(),
});
}
}
}
total_rows = total_rows + new_rows;
@ -947,7 +976,7 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
else
{
// We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal
// TODO: it seems like in case of put_error_to_stream=true we may need to process those differently
// TODO: it seems like in case of ExtStreamingHandleErrorMode::STREAM we may need to process those differently
// currently we just skip them with note in logs.
LOG_DEBUG(
log,

View File

@ -89,7 +89,7 @@ public:
const auto & getFormatName() const { return format_name; }
StreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
ExtStreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
private:
using TopicPartition = KafkaConsumer2::TopicPartition;

View File

@ -427,7 +427,7 @@ bool checkDependencies(const StorageID & table_id, const ContextPtr& context)
}
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode)
VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;
@ -440,7 +440,7 @@ VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_m
desc.addEphemeral("_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
if (handle_error_mode == ExtStreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_error", std::make_shared<DataTypeString>(), "");

View File

@ -47,7 +47,7 @@ SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const
bool checkDependencies(const StorageID & table_id, const ContextPtr& context);
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
VirtualColumnsDescription createVirtuals(ExtStreamingHandleErrorMode handle_error_mode);
}
}

View File

@ -0,0 +1,8 @@
<clickhouse>
<dead_letter_queue>
<database>system</database>
<table>dead_letter_queue</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</dead_letter_queue>
</clickhouse>

View File

@ -13,7 +13,7 @@ if is_arm():
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml"],
main_configs=["configs/kafka.xml", "configs/dead_letter_queue.xml"],
with_kafka=True,
)
@ -123,7 +123,39 @@ def kafka_cluster():
cluster.shutdown()
def test_bad_messages_parsing_stream(kafka_cluster):
def view_test(expected_num_messages, *_):
attempt = 0
rows = 0
while attempt < 500:
time.sleep(0.1)
rows = int(instance.query("SELECT count() FROM view"))
if rows == expected_num_messages:
break
attempt += 1
assert rows == expected_num_messages
def dead_letter_queue_test(expected_num_messages, topic_name):
view_test(expected_num_messages)
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(
f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical"
)
logging.debug(f"system.dead_letter_queue contains {result}")
rows = int(
instance.query(
f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'"
)
)
assert rows == expected_num_messages
def bad_messages_parsing_mode(
kafka_cluster, handle_error_mode, additional_dml, check_method
):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
@ -151,8 +183,9 @@ def test_bad_messages_parsing_stream(kafka_cluster):
"MySQLDump",
]:
print(format_name)
topic_name = f"{format_name}_{handle_error_mode}_err"
kafka_create_topic(admin_client, f"{format_name}_err")
kafka_create_topic(admin_client, f"{topic_name}")
instance.query(
f"""
@ -162,30 +195,21 @@ def test_bad_messages_parsing_stream(kafka_cluster):
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream';
kafka_handle_error_mode= '{handle_error_mode}';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
{additional_dml}
"""
)
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
kafka_produce(kafka_cluster, f"{topic_name}", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
check_method(len(messages), topic_name)
assert rows == len(messages)
kafka_delete_topic(admin_client, f"{format_name}_err")
kafka_delete_topic(admin_client, f"{topic_name}")
protobuf_schema = """
syntax = "proto3";
@ -199,6 +223,7 @@ message Message {
instance.create_format_schema("schema_test_errors.proto", protobuf_schema)
for format_name in ["Protobuf", "ProtobufSingle", "ProtobufList"]:
topic_name = f"{format_name}_{handle_error_mode}_err"
instance.query(
f"""
DROP TABLE IF EXISTS view;
@ -207,35 +232,26 @@ message Message {
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_topic_list = '{topic_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_handle_error_mode='stream',
kafka_handle_error_mode= '{handle_error_mode}',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
{additional_dml}
"""
)
print(format_name)
kafka_create_topic(admin_client, f"{format_name}_err")
kafka_create_topic(admin_client, f"{topic_name}")
messages = ["qwertyuiop", "poiuytrewq", "zxcvbnm"]
kafka_produce(kafka_cluster, f"{format_name}_err", messages)
kafka_produce(kafka_cluster, f"{topic_name}", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
check_method(len(messages), topic_name)
assert rows == len(messages)
kafka_delete_topic(admin_client, f"{format_name}_err")
kafka_delete_topic(admin_client, f"{topic_name}")
capn_proto_schema = """
@0xd9dd7b35452d1c4f;
@ -248,6 +264,7 @@ struct Message
"""
instance.create_format_schema("schema_test_errors.capnp", capn_proto_schema)
topic_name = f"CapnProto_{handle_error_mode}_err"
instance.query(
f"""
DROP TABLE IF EXISTS view;
@ -256,35 +273,44 @@ struct Message
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'CapnProto_err',
kafka_topic_list = '{topic_name}',
kafka_group_name = 'CapnProto',
kafka_format = 'CapnProto',
kafka_handle_error_mode='stream',
kafka_handle_error_mode= '{handle_error_mode}',
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0;
{additional_dml}
"""
)
print("CapnProto")
kafka_create_topic(admin_client, "CapnProto_err")
kafka_create_topic(admin_client, f"{topic_name}")
messages = ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
kafka_produce(kafka_cluster, "CapnProto_err", messages)
kafka_produce(kafka_cluster, f"{topic_name}", messages)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM view"))
if rows == len(messages):
break
attempt += 1
check_method(len(messages), topic_name)
assert rows == len(messages)
kafka_delete_topic(admin_client, f"{topic_name}")
kafka_delete_topic(admin_client, "CapnProto_err")
def test_bad_messages_parsing_stream(kafka_cluster):
bad_messages_parsing_mode(
kafka_cluster,
"stream",
"CREATE MATERIALIZED VIEW view Engine=Log AS SELECT _error FROM kafka WHERE length(_error) != 0",
view_test,
)
def test_bad_messages_parsing_dead_letter_queue(kafka_cluster):
bad_messages_parsing_mode(
kafka_cluster,
"dead_letter_queue",
"CREATE MATERIALIZED VIEW view Engine=Log AS SELECT key FROM kafka",
dead_letter_queue_test,
)
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):