# Kafka
Движок работает с [Apache Kafka](http://kafka.apache.org/).
Kafka позволяет:
- Публиковать/подписываться на потоки данных.
- Организовать отказо-устойчивое хранилище.
- Обрабатывать потоки по мере их появления.
```
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
```
Параметры:
- `broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`).
- `topic_list` - Перечень необходимых топиков Kafka (`my_topic`).
- `group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы".
- `schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
Пример:
```sql
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
```
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на [http://kafka.apache.org/intro](http://kafka.apache.org/intro).
Чтение сообщения с помощью `SELECT` не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:
1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
2. Создайте таблицу с необходимой структурой.
3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.
Когда к движку присоединяется материализованное представление (`MATERIALIZED VIEW`), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью `SELECT`.
Пример:
```sql
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
```
Для улучшения производительности полученные сообщения группируются в блоки размера [max_insert_block_size](../operations/settings/settings.md#settings-settings-max_insert_block_size). Если блок не удалось сформировать за [stream_flush_interval_ms](../operations/settings/settings.md#settings-settings_stream_flush_interval_ms) миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:
```
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
```
Если необходимо изменить целевую таблицу с помощью `ALTER`, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.
## Конфигурация
Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать - глобальный (`kafka`) и по топикам (`kafka_topic_*`). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).
```xml
cgrp
smallest
250
100000
```
В документе [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) можно увидеть список возможных опций конфигурации. Используйте подчёркивания (`_`) вместо точек в конфигурации ClickHouse, например, `check.crcs=true` будет соответствовать `true`.