ClickHouse/docs/ru/table_engines/kafka.md
BayoNet c56ada3200 Links fixed to standard MD syntax and the docs sources are the linked GitHub site.
Russian sources are updated following the english ones.
2017-11-17 21:39:58 +03:00

5.7 KiB
Raw Blame History

Kafka

Движок работает с Apache Kafka.

Kafka позволяет:

  • Публиковать/подписываться на потоки данных.
  • Организовать отказо-устойчивое хранилище.
  • Обрабатывать потоки по мере их появления.
Kafka(broker_list, topic_list, group_name, format[, schema])

Параметры:

  • broker_list - Перечень брокеров, разделенный запятыми (localhost:9092).
  • topic_list - Перечень необходимых топиков Kafka (my_topic).
  • group_name - Группа потребителя Kafka (group1). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
  • format - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение FORMAT, например, JSONEachRow.
  • schema - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap'n Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message.

Пример:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.

Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.

Чтение сообщения с помощью SELECT не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:

  1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
  2. Создайте таблицу с необходимой структурой.
  3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.

Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT.

Пример:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);
  
  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

SELECT level, sum(total) FROM daily GROUP BY level;

Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.

Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:

DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;

Если необходимо изменить целевую таблицу с помощью ALTER, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.