kafka_dead_letter_queue: docs

This commit is contained in:
Ilya Golshtein 2024-08-26 08:15:33 +00:00
parent ff49aef8ac
commit 7fc7293214
5 changed files with 12 additions and 4 deletions

View File

@ -62,7 +62,7 @@ Optional parameters:
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`).
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`), dead_letter_queue (error related data will be saved in system.dead_letter_queue) .
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.

View File

@ -64,7 +64,7 @@ SETTINGS
- `kafka_poll_max_batch_size` - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)
- `kafka_flush_interval_ms` - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)
- `kafka_thread_per_consumer` — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию `0`). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.
- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream.
- `kafka_handle_error_mode` - Способ обработки ошибок для Kafka. Возможные значения: default, stream, dead_letter_queue.
- `kafka_commit_on_select` - Сообщение о commit при запросе select. По умолчанию: `false`.
- `kafka_max_rows_per_message` - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: `1`.

View File

@ -25,7 +25,7 @@ ColumnsDescription DeadLetterQueueElement::getColumnsDescription()
{
{"stream_type", stream_type, "Stream type. Possible values: 'Kafka'."},
{"event_date", std::make_shared<DataTypeDate>(), "Message consuming date."},
{"event_time", std::make_shared<DataTypeDateTime>(), "Message consuming time."},
{"event_time", std::make_shared<DataTypeDateTime>(), "Message consuming date and time."},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6), "Query starting time with microseconds precision."},
{"database_name", low_cardinality_string, "ClickHouse database Kafka table belongs to."},
{"table_name", low_cardinality_string, "ClickHouse table name."},

View File

@ -0,0 +1,8 @@
<clickhouse>
<dead_letter_queue>
<database>system</database>
<table>dead_letter_queue</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</dead_letter_queue>
</clickhouse>

View File

@ -138,7 +138,7 @@ def dead_letter_queue_test(expected_num_messages, topic_name):
view_test(expected_num_messages)
instance.query("SYSTEM FLUSH LOGS")
result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'")
result = instance.query(f"SELECT * FROM system.dead_letter_queue WHERE topic_name = '{topic_name}' FORMAT Vertical")
logging.debug(f"system.dead_letter_queue contains {result}")
rows = int(instance.query(f"SELECT count() FROM system.dead_letter_queue WHERE topic_name = '{topic_name}'"))