--- toc_priority: 5 toc_title: Kafka --- # Kafka {#kafka} Движок работает с [Apache Kafka](http://kafka.apache.org/). Kafka позволяет: - Публиковать/подписываться на потоки данных. - Организовать отказоустойчивое хранилище. - Обрабатывать потоки по мере их появления. ## Создание таблицы {#table_engine-kafka-creating-a-table} ``` sql CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_skip_broken_messages = N] ``` Обязательные параметры: - `kafka_broker_list` – перечень брокеров, разделенный запятыми (`localhost:9092`). - `kafka_topic_list` – перечень необходимых топиков Kafka. - `kafka_group_name` – группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы. - `kafka_format` – формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции `FORMAT`, например, `JSONEachRow`. Подробнее читайте в разделе [Форматы](../../../interfaces/formats.md). Опциональные параметры: - `kafka_row_delimiter` – символ-разделитель записей (строк), которым завершается сообщение. - `kafka_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`. - `kafka_num_consumers` – количество потребителей (consumer) на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя. - `kafka_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `kafka_skip_broken_messages = N`, то движок отбрасывает `N` сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0. Примеры ``` sql CREATE TABLE queue ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); SELECT * FROM queue LIMIT 5; CREATE TABLE queue2 ( timestamp UInt64, level String, message String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'topic', kafka_group_name = 'group1', kafka_format = 'JSONEachRow', kafka_num_consumers = 4; CREATE TABLE queue2 ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') SETTINGS kafka_format = 'JSONEachRow', kafka_num_consumers = 4; ```
Устаревший способ создания таблицы !!! attention "Attention" Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше. ``` sql Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format [, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages]) ```
## Описание {#opisanie} Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы. Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro. Чтение сообщения с помощью `SELECT` не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого: 1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных. 2. Создайте таблицу с необходимой структурой. 3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу. Когда к движку присоединяется материализованное представление (`MATERIALIZED VIEW`), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью `SELECT`. Материализованных представлений у одной kafka таблицы может быть сколько угодно, они не считывают данные из таблицы kafka непосредственно, а получают новые записи (блоками), таким образом можно писать в несколько таблиц с разным уровнем детализации (с группировкой - агрегацией и без). Пример: ``` sql CREATE TABLE queue ( timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow'); CREATE TABLE daily ( day Date, level String, total UInt64 ) ENGINE = SummingMergeTree(day, (day, level), 8192); CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM queue GROUP BY day, level; SELECT level, sum(total) FROM daily GROUP BY level; ``` Для улучшения производительности полученные сообщения группируются в блоки размера [max_insert_block_size](../../../operations/settings/settings.md#settings-max_insert_block_size). Если блок не удалось сформировать за [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms) миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока. Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление: ``` sql DETACH TABLE consumer; ATTACH TABLE consumer; ``` Если необходимо изменить целевую таблицу с помощью `ALTER`, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления. ## Конфигурация {#konfiguratsiia} Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (`kafka`) и по топикам (`kafka_topic_*`). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует). ``` xml cgrp smallest 250 100000 ``` В документе [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) можно увидеть список возможных опций конфигурации. Используйте подчеркивание (`_`) вместо точки в конфигурации ClickHouse. Например, `check.crcs=true` будет соответствовать `true`. ### Поддержка Kerberos {#kafka-kerberos-support} Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент `security_protocol` со значением `sasl_plaintext`. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` и `sasl.kerberos.kinit.cmd`. Пример: ``` xml SASL_PLAINTEXT /home/kafkauser/kafkauser.keytab kafkauser/kafkahost@EXAMPLE.COM ``` ## Виртуальные столбцы {#virtualnye-stolbtsy} - `_topic` — топик Kafka. - `_key` — ключ сообщения. - `_offset` — оффсет сообщения. - `_timestamp` — временная метка сообщения. - `_partition` — секция топика Kafka. **Смотрите также** - [Виртуальные столбцы](index.md#table_engines-virtual_columns) - [background_schedule_pool_size](../../../operations/settings/settings.md#background_schedule_pool_size) [Оригинальная статья](https://clickhouse.tech/docs/ru/operations/table_engines/kafka/)