ClickHouse/docs/en/engines/table-engines/integrations/kafka.md

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

263 lines
12 KiB
Markdown
Raw Normal View History

2020-04-03 13:23:32 +00:00
---
2022-08-28 14:53:34 +00:00
slug: /en/engines/table-engines/integrations/kafka
sidebar_position: 8
sidebar_label: Kafka
2020-04-03 13:23:32 +00:00
---
2022-06-02 10:55:18 +00:00
# Kafka
2018-04-23 06:20:21 +00:00
This engine works with [Apache Kafka](http://kafka.apache.org/).
Kafka lets you:
- Publish or subscribe to data flows.
- Organize fault-tolerant storage.
- Process streams as they become available.
2018-04-23 06:20:21 +00:00
## Creating a Table {#table_engine-kafka-creating-a-table}
2020-03-20 10:10:48 +00:00
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
2020-04-27 05:02:45 +00:00
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
```
2020-03-20 10:10:48 +00:00
Required parameters:
- `kafka_broker_list` — A comma-separated list of brokers (for example, `localhost:9092`).
- `kafka_topic_list` — A list of Kafka topics.
- `kafka_group_name` — A group of Kafka consumers. Reading margins are tracked for each group separately. If you do not want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` — Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.
Optional parameters:
- `kafka_row_delimiter` — Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.**
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). Default: `0`.
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block. Default: `0`.
- `kafka_client_id` — Client identifier. Empty by default.
- `kafka_poll_timeout_ms` — Timeout for single poll from Kafka. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms).
- `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
- `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms).
- `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`.
- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default, stream.
- `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`.
- `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`.
Examples:
2018-04-23 06:20:21 +00:00
2020-03-20 10:10:48 +00:00
``` sql
2018-04-23 06:20:21 +00:00
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
2018-04-23 06:20:21 +00:00
```
2020-03-20 10:10:48 +00:00
<details markdown="1">
2020-03-20 10:10:48 +00:00
<summary>Deprecated Method for Creating a Table</summary>
2023-03-27 18:54:05 +00:00
:::note
Do not use this method in new projects. If possible, switch old projects to the method described above.
:::
2020-03-20 10:10:48 +00:00
``` sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
```
</details>
:::info
The Kafka table engine doesn't support columns with [default value](../../../sql-reference/statements/create/table.md#default_value). If you need columns with default value, you can add them at materialized view level (see below).
:::
2020-03-20 10:10:48 +00:00
## Description {#description}
2018-04-23 06:20:21 +00:00
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
2020-03-20 10:10:48 +00:00
Groups are flexible and synced on the cluster. For instance, if you have 10 topics and 5 copies of a table in a cluster, then each copy gets 2 topics. If the number of copies changes, the topics are redistributed across the copies automatically. Read more about this at http://kafka.apache.org/intro.
2018-04-23 06:20:21 +00:00
`SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using materialized views. To do this:
2020-03-20 10:10:48 +00:00
1. Use the engine to create a Kafka consumer and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
2018-04-23 06:20:21 +00:00
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using `SELECT`.
One kafka table can have as many materialized views as you like, they do not read data from the kafka table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).
2018-04-23 06:20:21 +00:00
Example:
2020-03-20 10:10:48 +00:00
``` sql
2018-04-23 06:20:21 +00:00
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
2018-04-23 06:20:21 +00:00
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
```
To improve performance, received messages are grouped into blocks the size of [max_insert_block_size](../../../operations/settings/settings.md#settings-max_insert_block_size). If the block wasnt formed within [stream_flush_interval_ms](../../../operations/settings/settings.md/#stream-flush-interval-ms) milliseconds, the data will be flushed to the table regardless of the completeness of the block.
2018-04-23 06:20:21 +00:00
To stop receiving topic data or to change the conversion logic, detach the materialized view:
2020-03-20 10:10:48 +00:00
``` sql
2018-04-23 06:20:21 +00:00
DETACH TABLE consumer;
ATTACH TABLE consumer;
2018-04-23 06:20:21 +00:00
```
If you want to change the target table by using `ALTER`, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.
2018-04-23 06:20:21 +00:00
2020-03-20 10:10:48 +00:00
## Configuration {#configuration}
2018-04-23 06:20:21 +00:00
2023-02-23 20:04:41 +00:00
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (below `<kafka>`) and topic-level (below `<kafka><kafka_topic>`). The global configuration is applied first, and then the topic-level configuration is applied (if it exists).
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
``` xml
<kafka>
2023-02-23 20:07:06 +00:00
<!-- Global configuration options for all tables of Kafka engine type -->
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
2023-02-23 20:06:24 +00:00
2023-02-23 20:04:41 +00:00
<!-- Configuration specific to topics "logs" and "stats" -->
2023-02-23 20:06:24 +00:00
2023-02-23 20:04:41 +00:00
<kafka_topic>
<name>logs</name>
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
2023-02-23 20:04:41 +00:00
</kafka_topic>
2023-02-23 20:06:24 +00:00
2023-02-23 20:04:41 +00:00
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
</kafka>
```
<details markdown="1">
<summary>Example in deprecated syntax</summary>
2018-04-23 06:20:21 +00:00
2020-03-20 10:10:48 +00:00
``` xml
2018-04-23 06:20:21 +00:00
<kafka>
2023-02-23 20:07:06 +00:00
<!-- Global configuration options for all tables of Kafka engine type -->
2018-04-23 06:20:21 +00:00
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
2023-02-23 20:04:41 +00:00
<!-- Configuration specific to topics "logs" and "stats" -->
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
<!-- Does NOT support periods in topic names, e.g. "logs.security"> -->
2023-02-23 20:06:24 +00:00
<kafka_logs>
2018-04-23 06:20:21 +00:00
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
2023-02-23 20:06:24 +00:00
2023-02-23 20:04:41 +00:00
<kafka_stats>
<retry_backoff_ms>400</retry_backoff_ms>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_stats>
2018-04-23 06:20:21 +00:00
```
Allow configuration of Kafka topics with periods The Kafka table engine allows global configuration and per-Kafka-topic configuration. The latter uses syntax <kafka_TOPIC>, e.g. for topic "football": <kafka_football> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_football> Some users had to find out the hard way that such configuration doesn't take effect if the topic name contains a period, e.g. "sports.football". The reason is that ClickHouse configuration framework already uses periods as level separators to descend the configuration hierarchy. (Besides that, per-topic configuration at the same level as global configuration could be considered ugly.) Note that Kafka topics may contain characters "a-zA-Z0-9._-" (*) and a tree-like topic organization using periods is quite common in practice. This PR deprecates the existing per-topic configuration syntax (but continues to support it for backward compat) and introduces a new per-topic configuration syntax below the global Kafka configuration of the form: <kafka> <topic name="football"> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </topic> </kafka> The period restriction doesn't apply to XML attributes, so <topic name="sports.football"> will work. Also, everything Kafka-related is below <kafka>. Considered but rejected alternatives: - Extending Poco ConfigurationView with custom separators (e.g."/" instead of "."). Won't work easily because ConfigurationView only builds a path but defers descending the configuration tree to the normal configuration classes. - Reloading the configuration file in StorageKafka (instead of reading the loaded file) but with a custom separator. This mode is supported by XML configuration. Too ugly and error-prone since the true configuration is composed from multiple configuration files. (*) https://stackoverflow.com/a/37067544
2023-02-22 19:58:48 +00:00
</details>
2018-04-23 06:20:21 +00:00
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab` and `sasl_kerberos_principal` child elements.
Example:
``` xml
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
```
2020-03-20 10:10:48 +00:00
## Virtual Columns {#virtual-columns}
- `_topic` — Kafka topic.
- `_key` — Key of the message.
- `_offset` — Offset of the message.
- `_timestamp` — Timestamp of the message.
- `_timestamp_ms` — Timestamp in milliseconds of the message.
- `_partition` — Partition of Kafka topic.
- `_headers.name` — Array of message's headers keys.
- `_headers.value` — Array of message's headers values.
## Data formats support {#data-formats-support}
Kafka engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.
The number of rows in one Kafka message depends on whether the format is row-based or block-based:
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
- [background_message_broker_schedule_pool_size](../../../operations/server-configuration-parameters/settings.md#background_message_broker_schedule_pool_size)