4.1 KiB
Kafka
Движок работает с Apache Kafka.
Kafka позволяет:
- Публиковать/подписываться на потоки данных.
- Организовать отказо-устойчивое хранилище.
- Обрабатывать потоки по мере их появления.
Kafka(broker_list, topic_list, group_name, format[, schema])
Параметры:
broker_list
- Перечень брокеров, разделенный запятыми (localhost:9092
).topic_list
- Перечень топиков Kafka(my_topic
).group_name
- Группа потребителя Kafka (group1
). Отступы для чтения отслеживаются для каждой группы отдельно.format
- Формат сообщений. Имеет те же обозначения, что выдает SQL-выражениеFORMAT
, например,JSONEachRow
.schema
- Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap'n Proto требует путь к файлу со схемой и название корневого объектаschema.capnp:Message
.
Пример:
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.
Kafka обычно используется для построения систем работающих в режиме реального времени. Если к движку присоединить материализованные представления (MATERIALIZED VIEW
), то он будет в фоновом режиме собирать данные и раскладывать их в соответствующие представления. Что позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT
.
Пример:
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE MATERIALIZED VIEW daily ENGINE = SummingMergeTree(day, (day, level), 8192) AS
SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.