ClickHouse/docs/en/engines/table-engines/integrations/kafka.md
BayoNet 58786f9929
DOCS-620: Background pools settings (#11358)
* DOCSUP-1036: Describe settings for different pools (#118)

* describe settings for different pools

* minor changes

* add refs to and from distributed, server settings, kafka, replication, mergetree

* changes in links description

* add description and links to RU version

* add descrtiption and links to ru version

* minor changes

* CLICKHOUSEDOCS-620: Returned text lost by the last merge.

Co-authored-by: Evgenia Sudarikova <56156889+otrazhenia@users.noreply.github.com>
Co-authored-by: Sergei Shtykov <bayonet@yandex-team.ru>
2020-06-02 21:11:56 +03:00

8.0 KiB
Raw Blame History

toc_priority toc_title
32 Kafka

Kafka

This engine works with Apache Kafka.

Kafka lets you:

  • Publish or subscribe to data flows.
  • Organize fault-tolerant storage.
  • Process streams as they become available.

Creating a Table

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0]

Required parameters:

  • kafka_broker_list A comma-separated list of brokers (for example, localhost:9092).
  • kafka_topic_list A list of Kafka topics.
  • kafka_group_name A group of Kafka consumers. Reading margins are tracked for each group separately. If you dont want messages to be duplicated in the cluster, use the same group name everywhere.
  • kafka_format Message format. Uses the same notation as the SQL FORMAT function, such as JSONEachRow. For more information, see the Formats section.

Optional parameters:

  • kafka_row_delimiter Delimiter character, which ends the message.
  • kafka_schema Parameter that must be used if the format requires a schema definition. For example, Capn Proto requires the path to the schema file and the name of the root schema.capnp:Message object.
  • kafka_num_consumers The number of consumers per table. Default: 1. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
  • kafka_max_block_size - The maximum batch size (in messages) for poll (default: max_block_size).
  • kafka_skip_broken_messages Kafka message parser tolerance to schema-incompatible messages per block. Default: 0. If kafka_skip_broken_messages = N then the engine skips N Kafka messages that cannot be parsed (a message equals a row of data).
  • kafka_commit_every_batch - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: 0).

Examples:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Deprecated Method for Creating a Table

!!! attention "Attention" Do not use this method in new projects. If possible, switch old projects to the method described above.

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])

Description

The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.

Groups are flexible and synced on the cluster. For instance, if you have 10 topics and 5 copies of a table in a cluster, then each copy gets 2 topics. If the number of copies changes, the topics are redistributed across the copies automatically. Read more about this at http://kafka.apache.org/intro.

SELECT is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using materialized views. To do this:

  1. Use the engine to create a Kafka consumer and consider it a data stream.
  2. Create a table with the desired structure.
  3. Create a materialized view that converts data from the engine and puts it into a previously created table.

When the MATERIALIZED VIEW joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using SELECT. One kafka table can have as many materialized views as you like, they do not read data from the kafka table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).

Example:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

To improve performance, received messages are grouped into blocks the size of max_insert_block_size. If the block wasnt formed within stream_flush_interval_ms milliseconds, the data will be flushed to the table regardless of the completeness of the block.

To stop receiving topic data or to change the conversion logic, detach the materialized view:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

If you want to change the target table by using ALTER, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.

Configuration

Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (kafka) and topic-level (kafka_*). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

For a list of possible configuration options, see the librdkafka configuration reference. Use the underscore (_) instead of a dot in the ClickHouse configuration. For example, check.crcs=true will be <check_crcs>true</check_crcs>.

Virtual Columns

  • _topic — Kafka topic.
  • _key — Key of the message.
  • _offset — Offset of the message.
  • _timestamp — Timestamp of the message.
  • _partition — Partition of Kafka topic.

See Also

Original article