ClickHouse/docs/ru/operations/table_engines/kafka.md
Ivan Blinkov 16ca492938
WIP on docs (#3813)
* CLICKHOUSE-4063: less manual html @ index.md

* CLICKHOUSE-4063: recommend markdown="1" in README.md

* CLICKHOUSE-4003: manually purge custom.css for now

* CLICKHOUSE-4064: expand <details> before any print (including to pdf)

* CLICKHOUSE-3927: rearrange interfaces/formats.md a bit

* CLICKHOUSE-3306: add few http headers

* Remove copy-paste introduced in #3392

* Hopefully better chinese fonts #3392

* get rid of tabs @ custom.css

* Apply comments and patch from #3384

* Add jdbc.md to ToC and some translation, though it still looks badly incomplete

* minor punctuation

* Add some backlinks to official website from mirrors that just blindly take markdown sources

* Do not make fonts extra light

* find . -name '*.md' -type f | xargs -I{} perl -pi -e 's//g' {}

* find . -name '*.md' -type f | xargs -I{} perl -pi -e 's/ sql/g' {}

* Remove outdated stuff from roadmap.md

* Not so light font on front page too

* Refactor Chinese formats.md to match recent changes in other languages

* Update some links on front page

* Remove some outdated comment

* Add twitter link to front page

* More front page links tuning

* Add Amsterdam meetup link

* Smaller font to avoid second line

* Add Amsterdam link to README.md

* Proper docs nav translation

* Back to 300 font-weight except Chinese

* fix docs build

* Update Amsterdam link

* remove symlinks

* more zh punctuation

* apply lost comment by @zhang2014

* Apply comments by @zhang2014 from #3417

* Remove Beijing link

* rm incorrect symlink

* restore content of docs/zh/operations/table_engines/index.md

* CLICKHOUSE-3751: stem terms while searching docs

* CLICKHOUSE-3751: use English stemmer in non-English docs too

* CLICKHOUSE-4135 fix

* Remove past meetup link

* Add blog link to top nav

* Add ContentSquare article link

* Add form link to front page + refactor some texts

* couple markup fixes

* minor

* Introduce basic ODBC driver page in docs

* More verbose 3rd party libs disclaimer

* Put third-party stuff into a separate folder

* Separate third-party stuff in ToC too

* Update links

* Move stuff that is not really (only) a client library into a separate page

* Add clickhouse-hdfs-loader link

* Some introduction for "interfaces" section

* Rewrite tcp.md

* http_interface.md -> http.md

* fix link

* Remove unconvenient error for now

* try to guess anchor instead of failing

* remove symlink

* Remove outdated info from introduction

* remove ru roadmap.md

* replace ru roadmap.md with symlink

* Update roadmap.md

* lost file

* Title case in toc_en.yml

* Sync "Functions" ToC section with en

* Remove reference to pretty old ClickHouse release from docs

* couple lost symlinks in fa

* Close quote in proper place

* Rewrite en/getting_started/index.md

* Sync en<>ru getting_started/index.md

* minor changes

* Some gui.md refactoring

* Translate DataGrip section to ru

* Translate DataGrip section to zh

* Translate DataGrip section to fa

* Translate DBeaver section to fa

* Translate DBeaver section to zh

* Split third-party GUI to open-source and commercial

* Mention some RDBMS integrations + ad-hoc translation fixes

* Add rel="external nofollow" to outgoing links from docs

* Lost blank lines

* Fix class name

* More rel="external nofollow"

* Apply suggestions by @sundy-li

* Mobile version of front page improvements

* test

* test 2

* test 3

* Update LICENSE

* minor docs fix

* Highlight current article as suggested by @sundy-li

* fix link destination

* Introduce backup.md (only "en" for now)

* Mention INSERT+SELECT in backup.md

* Some improvements for replication.md

* Add backup.md to toc

* Mention clickhouse-backup tool

* Mention LightHouse in third-party GUI list

* Introduce interfaces/third-party/proxy.md

* Add clickhouse-bulk to proxy.md

* Major extension of integrations.md contents

* fix link target

* remove unneeded file

* better toc item name

* fix markdown

* better ru punctuation

* Add yet another possible backup approach

* Simplify copying permalinks to headers

* Support non-eng link anchors in docs + update some deps

* Generate anchors for single-page mode automatically

* Remove anchors to top of pages

* Remove anchors that nobody links to

* build fixes

* fix few links

* restore css

* fix some links

* restore gifs

* fix lost words

* more docs fixes

* docs fixes

* NULL anchor

* update urllib3 dependency

* more fixes
2018-12-12 20:28:00 +03:00

9.0 KiB
Raw Blame History

Kafka

Движок работает с Apache Kafka.

Kafka позволяет:

  • Публиковать/подписываться на потоки данных.
  • Организовать отказо-устойчивое хранилище.
  • Обрабатывать потоки по мере их появления.

Старый формат:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

Новый формат:

Kafka SETTINGS
  kafka_broker_list = 'localhost:9092',
  kafka_topic_list = 'topic1,topic2',
  kafka_group_name = 'group1',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n'
  kafka_schema = '',
  kafka_num_consumers = 2

Обязательные параметры:

  • kafka_broker_list - Перечень брокеров, разделенный запятыми (localhost:9092).
  • kafka_topic_list - Перечень необходимых топиков Kafka (my_topic).
  • kafka_group_name - Группа потребителя Kafka (group1). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
  • kafka_format - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение FORMAT, например, JSONEachRow. Подробнее смотрите в разделе "Форматы".

Опциональные параметры:

  • kafka_row_delimiter - Символ-разделитель записей (строк), которым завершается сообщение.
  • kafka_schema - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap'n Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message.
  • kafka_num_consumers - Количество потребителей (consumer) на таблицу. По умолчанию 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.

Примеры:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;

Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.

Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.

Чтение сообщения с помощью SELECT не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:

  1. Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
  2. Создайте таблицу с необходимой структурой.
  3. Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.

Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT.

Пример:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.

Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:

  DETACH TABLE consumer;
  ATTACH MATERIALIZED VIEW consumer;

Если необходимо изменить целевую таблицу с помощью ALTER, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.

Конфигурация

Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать - глобальный (kafka) и по топикам (kafka_*). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).

  <!--  Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

В документе librdkafka configuration reference можно увидеть список возможных опций конфигурации. Используйте подчёркивания (_) вместо точек в конфигурации ClickHouse, например, check.crcs=true будет соответствовать <check_crcs>true</check_crcs>.

Оригинальная статья