ClickHouse/docs/ru/engines/table-engines/integrations/kafka.md
filimonov fbb22348ea
Refactor reading the pool setting & from server config. (#48055)
After #36425 there was a lot of confusions/problems with configuring pools - when the message was confusing, and settings need to be ajusted in several places.
See some examples in #44251, #43351, #47900, #46515.

The commit includes the following changes:
1) Introduced a unified mechanism for reading pool sizes from the configuration file(s). Previously, pool sizes were read from the Context.cpp with fallbacks to profiles, whereas main_config_reloader in Server.cpp read them directly without fallbacks.
2) Corrected the data type for background_merges_mutations_concurrency_ratio. It should be float instead of int.
3) Refactored the default values for settings. Previously, they were defined in multiple places throughout the codebase, but they are now defined in one place (or two, to be exact: Settings.h and ServerSettings.h).
4) Improved documentation, including the correct message in system.settings.

Additionally make the code more conform with #46550.
2023-03-30 16:44:11 +02:00

15 KiB
Raw Blame History

slug sidebar_position sidebar_label
/ru/engines/table-engines/integrations/kafka 8 Kafka

Kafka

Движок работает с Apache Kafka.

Kafka позволяет:

  • Публиковать/подписываться на потоки данных.
  • Организовать отказоустойчивое хранилище.
  • Обрабатывать потоки по мере их появления.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

Обязательные параметры:

  • kafka_broker_list — перечень брокеров, разделенный запятыми (localhost:9092).
  • kafka_topic_list — перечень необходимых топиков Kafka.
  • kafka_group_name — группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
  • kafka_format — формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции FORMAT, например, JSONEachRow. Подробнее читайте в разделе Форматы.

Опциональные параметры:

  • kafka_row_delimiter — символ-разделитель записей (строк), которым завершается сообщение.
  • kafka_schema — опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Capn Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message.
  • kafka_num_consumers — количество потребителей (consumer) на таблицу. По умолчанию: 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
  • kafka_max_block_size — максимальный размер пачек (в сообщениях) для poll (по умолчанию max_block_size).
  • kafka_skip_broken_messages — максимальное количество некорректных сообщений в блоке. Если kafka_skip_broken_messages = N, то движок отбрасывает N сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
  • kafka_commit_every_batch — включает или отключает режим записи каждой принятой и обработанной пачки по отдельности вместо единой записи целого блока (по умолчанию 0).
  • kafka_client_id — идентификатор клиента. Значение по умолчанию пусто ''.
  • kafka_poll_timeout_ms - Таймаут для poll. По умолчанию: (../../../operations/settings/settings.md#stream_poll_timeout_ms)
  • kafka_poll_max_batch_size - Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)
  • kafka_flush_interval_ms - Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)
  • kafka_thread_per_consumer — включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию 0). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.
  • kafka_handle_error_mode - Способ обработки ошибок для Kafka. Возможные значения: default, stream.
  • kafka_commit_on_select - Сообщение о commit при запросе select. По умолчанию: false.
  • kafka_max_rows_per_message - Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: 1.

Примеры

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Устаревший способ создания таблицы

:::note "Attention" Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше. :::

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])

Описание

Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.

Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.

Чтение сообщения с помощью SELECT не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:

  1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
  2. Создайте таблицу с необходимой структурой.
  3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.

Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT. Материализованных представлений у одной kafka таблицы может быть сколько угодно, они не считывают данные из таблицы kafka непосредственно, а получают новые записи (блоками), таким образом можно писать в несколько таблиц с разным уровнем детализации (с группировкой - агрегацией и без).

Пример:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.

Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

Если необходимо изменить целевую таблицу с помощью ALTER, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.

Конфигурация

Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (kafka) и по топикам (kafka_topic_*). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

В документе librdkafka configuration reference можно увидеть список возможных опций конфигурации. Используйте подчеркивание (_) вместо точки в конфигурации ClickHouse. Например, check.crcs=true будет соответствовать <check_crcs>true</check_crcs>.

Поддержка Kerberos

Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент security_protocol со значением sasl_plaintext. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal.

Пример:

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
	<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
	<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

Виртуальные столбцы

  • _topic — топик Kafka.
  • _key — ключ сообщения.
  • _offset — оффсет сообщения.
  • _timestamp — временная метка сообщения.
  • _timestamp_ms — временная метка сообщения в миллисекундах.
  • _partition — секция топика Kafka.
  • _headers.name - Массив ключей заголовков сообщений.
  • _headers.value - Массив значений заголовков сообщений.

Смотрите также