ClickHouse/docs/ru/engines/table-engines/integrations/kafka.md
2024-04-04 13:56:15 +02:00

16 KiB
Raw Blame History

slug sidebar_position sidebar_label
/ru/engines/table-engines/integrations/kafka 8 Kafka

Kafka

Движок работает с Apache Kafka.

Kafka позволяет:

  • Публиковать/подписываться на потоки данных.
  • Организовать отказоустойчивое хранилище.
  • Обрабатывать потоки по мере их появления.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

Обязательные параметры:

  • kafka_broker_list — перечень брокеров, разделенный запятыми (localhost:9092).
  • kafka_topic_list — перечень необходимых топиков Kafka.
  • kafka_group_name — группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
  • kafka_format — формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции FORMAT, например, JSONEachRow. Подробнее читайте в разделе Форматы.

Опциональные параметры:

  • kafka_row_delimiter — символ-разделитель записей (строк), которым завершается сообщение.
  • kafka_schema — опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Capn Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message.
  • kafka_num_consumers — количество потребителей (consumer) на таблицу. По умолчанию: 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
  • kafka_max_block_size — максимальный размер пачек (в сообщениях) для poll (по умолчанию max_block_size).
  • kafka_skip_broken_messages — максимальное количество некорректных сообщений в блоке. Если kafka_skip_broken_messages = N, то движок отбрасывает N сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
  • kafka_commit_every_batch — включает или отключает режим записи каждой принятой и обработанной пачки по отдельности вместо единой записи целого блока (по умолчанию 0).
  • kafka_client_id — идентификатор клиента. Значение по умолчанию пусто ''.
  • kafka_poll_timeout_ms - Таймаут для poll. По умолчанию: (../../../operations/settings/settings.md#stream_poll_timeout_ms)
  • kafka_poll_max_batch_size - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)
  • kafka_flush_interval_ms - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)
  • kafka_thread_per_consumer — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию 0). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.
  • kafka_handle_error_mode - Способ обработки ошибок для Kafka. Возможные значения: default, stream.
  • kafka_commit_on_select - Сообщение о commit при запросе select. По умолчанию: false.
  • kafka_max_rows_per_message - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: 1.

Примеры

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Устаревший способ создания таблицы

:::note Важно Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше. :::

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])

Описание

Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.

Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.

Чтение сообщения с помощью SELECT не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:

  1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
  2. Создайте таблицу с необходимой структурой.
  3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.

Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT. Материализованных представлений у одной kafka таблицы может быть сколько угодно, они не считывают данные из таблицы kafka непосредственно, а получают новые записи (блоками), таким образом можно писать в несколько таблиц с разным уровнем детализации (с группировкой - агрегацией и без).

Пример:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.

Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

Если необходимо изменить целевую таблицу с помощью ALTER, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.

Конфигурация

Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (kafka) и по топикам (kafka_topic_*). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).

<kafka>
    <!-- Global configuration options for all tables of Kafka engine type -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- Settings for consumer -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- Settings for producer -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
</kafka>

В документе librdkafka configuration reference можно увидеть список возможных опций конфигурации. Используйте подчеркивание (_) вместо точки в конфигурации ClickHouse. Например, check.crcs=true будет соответствовать <check_crcs>true</check_crcs>.

Поддержка Kerberos

Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент security_protocol со значением sasl_plaintext. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal.

Пример:

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
	<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
	<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

Виртуальные столбцы

  • _topic — топик Kafka.
  • _key — ключ сообщения.
  • _offset — оффсет сообщения.
  • _timestamp — временная метка сообщения.
  • _timestamp_ms — временная метка сообщения в миллисекундах.
  • _partition — секция топика Kafka.
  • _headers.name - Массив ключей заголовков сообщений.
  • _headers.value - Массив значений заголовков сообщений.

Смотрите также