2022-08-26 19:07:59 +00:00
|
|
|
|
---
|
|
|
|
|
slug: /zh/engines/table-engines/integrations/kafka
|
|
|
|
|
---
|
2020-07-07 10:35:16 +00:00
|
|
|
|
# Kafka {#kafka}
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
此引擎与 [Apache Kafka](http://kafka.apache.org/) 结合使用。
|
|
|
|
|
|
|
|
|
|
Kafka 特性:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
- 发布或者订阅数据流。
|
|
|
|
|
- 容错存储机制。
|
|
|
|
|
- 处理流数据。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
2020-01-28 15:27:44 +00:00
|
|
|
|
<a name="table_engine-kafka-creating-a-table"></a>
|
|
|
|
|
|
2018-11-30 19:26:35 +00:00
|
|
|
|
老版格式:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
|
|
|
|
|
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
新版格式:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
Kafka SETTINGS
|
|
|
|
|
kafka_broker_list = 'localhost:9092',
|
|
|
|
|
kafka_topic_list = 'topic1,topic2',
|
|
|
|
|
kafka_group_name = 'group1',
|
|
|
|
|
kafka_format = 'JSONEachRow',
|
|
|
|
|
kafka_row_delimiter = '\n',
|
|
|
|
|
kafka_schema = '',
|
|
|
|
|
kafka_num_consumers = 2
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
必要参数:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
- `kafka_broker_list` – 以逗号分隔的 brokers 列表 (`localhost:9092`)。
|
|
|
|
|
- `kafka_topic_list` – topic 列表 (`my_topic`)。
|
|
|
|
|
- `kafka_group_name` – Kafka 消费组名称 (`group1`)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
|
|
|
|
|
- `kafka_format` – 消息体格式。使用与 SQL 部分的 `FORMAT` 函数相同表示方法,例如 `JSONEachRow`。了解详细信息,请参考 `Formats` 部分。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
可选参数:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
- `kafka_row_delimiter` - 每个消息体(记录)之间的分隔符。
|
2020-04-08 14:22:25 +00:00
|
|
|
|
- `kafka_schema` – 如果解析格式需要一个 schema 时,此参数必填。例如,[普罗托船长](https://capnproto.org/) 需要 schema 文件路径以及根对象 `schema.capnp:Message` 的名字。
|
2020-03-21 04:11:51 +00:00
|
|
|
|
- `kafka_num_consumers` – 单个表的消费者数量。默认值是:`1`,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
示例:
|
|
|
|
|
|
|
|
|
|
``` sql
|
|
|
|
|
CREATE TABLE queue (
|
|
|
|
|
timestamp UInt64,
|
|
|
|
|
level String,
|
|
|
|
|
message String
|
|
|
|
|
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
|
|
|
|
|
|
|
|
|
|
SELECT * FROM queue LIMIT 5;
|
|
|
|
|
|
|
|
|
|
CREATE TABLE queue2 (
|
|
|
|
|
timestamp UInt64,
|
|
|
|
|
level String,
|
|
|
|
|
message String
|
|
|
|
|
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
|
|
|
|
|
kafka_topic_list = 'topic',
|
|
|
|
|
kafka_group_name = 'group1',
|
|
|
|
|
kafka_format = 'JSONEachRow',
|
|
|
|
|
kafka_num_consumers = 4;
|
|
|
|
|
|
|
|
|
|
CREATE TABLE queue2 (
|
|
|
|
|
timestamp UInt64,
|
|
|
|
|
level String,
|
|
|
|
|
message String
|
|
|
|
|
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
|
|
|
|
|
SETTINGS kafka_format = 'JSONEachRow',
|
|
|
|
|
kafka_num_consumers = 4;
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
|
|
|
|
|
|
2022-11-23 07:30:53 +00:00
|
|
|
|
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。了解更多信息请访问 [http://kafka.apache.org/intro](http://kafka.apache.org/intro)。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
`SELECT` 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:
|
|
|
|
|
|
2020-03-20 18:20:59 +00:00
|
|
|
|
1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
|
|
|
|
|
2. 创建一个结构表。
|
|
|
|
|
3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
当 `MATERIALIZED VIEW` 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 `SELECT` 将数据转换为所需要的格式。
|
|
|
|
|
|
|
|
|
|
示例:
|
|
|
|
|
|
|
|
|
|
``` sql
|
|
|
|
|
CREATE TABLE queue (
|
|
|
|
|
timestamp UInt64,
|
|
|
|
|
level String,
|
|
|
|
|
message String
|
|
|
|
|
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
|
|
|
|
|
|
|
|
|
|
CREATE TABLE daily (
|
|
|
|
|
day Date,
|
|
|
|
|
level String,
|
|
|
|
|
total UInt64
|
|
|
|
|
) ENGINE = SummingMergeTree(day, (day, level), 8192);
|
|
|
|
|
|
|
|
|
|
CREATE MATERIALIZED VIEW consumer TO daily
|
|
|
|
|
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
|
|
|
|
|
FROM queue GROUP BY day, level;
|
|
|
|
|
|
|
|
|
|
SELECT level, sum(total) FROM daily GROUP BY level;
|
|
|
|
|
```
|
|
|
|
|
|
2020-10-13 17:23:29 +00:00
|
|
|
|
为了提高性能,接受的消息被分组为 [max_insert_block_size](../../../operations/settings/settings.md#settings-max_insert_block_size) 大小的块。如果未在 [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms) 毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
|
|
|
|
|
|
2020-03-21 04:11:51 +00:00
|
|
|
|
DETACH TABLE consumer;
|
2020-04-25 00:08:00 +00:00
|
|
|
|
ATTACH TABLE consumer;
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
如果使用 `ALTER` 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
|
|
|
|
|
|
2020-03-20 18:20:59 +00:00
|
|
|
|
## 配置 {#pei-zhi}
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
|
|
|
|
与 `GraphiteMergeTree` 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (`kafka`) 和 主题级别 (`kafka_*`)。首先应用全局配置,然后应用主题级配置(如果存在)。
|
|
|
|
|
|
2020-03-20 18:20:59 +00:00
|
|
|
|
``` xml
|
2018-12-25 15:25:43 +00:00
|
|
|
|
<!-- Global configuration options for all tables of Kafka engine type -->
|
2018-11-30 19:26:35 +00:00
|
|
|
|
<kafka>
|
|
|
|
|
<debug>cgrp</debug>
|
|
|
|
|
<auto_offset_reset>smallest</auto_offset_reset>
|
|
|
|
|
</kafka>
|
|
|
|
|
|
|
|
|
|
<!-- Configuration specific for topic "logs" -->
|
|
|
|
|
<kafka_logs>
|
|
|
|
|
<retry_backoff_ms>250</retry_backoff_ms>
|
|
|
|
|
<fetch_min_bytes>100000</fetch_min_bytes>
|
|
|
|
|
</kafka_logs>
|
|
|
|
|
```
|
|
|
|
|
|
2020-04-08 14:22:25 +00:00
|
|
|
|
有关详细配置选项列表,请参阅 [librdkafka配置参考](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。在 ClickHouse 配置中使用下划线 (`_`) ,并不是使用点 (`.`)。例如,`check.crcs=true` 将是 `<check_crcs>true</check_crcs>`。
|
2018-11-30 19:26:35 +00:00
|
|
|
|
|
2022-01-10 09:38:29 +00:00
|
|
|
|
### Kerberos 支持 {#kafka-kerberos-zhi-chi}
|
|
|
|
|
|
|
|
|
|
对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
|
|
|
|
|
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
|
|
|
|
|
|
|
|
|
|
示例:
|
|
|
|
|
|
|
|
|
|
``` xml
|
|
|
|
|
<!-- Kerberos-aware Kafka -->
|
|
|
|
|
<kafka>
|
|
|
|
|
<security_protocol>SASL_PLAINTEXT</security_protocol>
|
|
|
|
|
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
|
|
|
|
|
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
|
|
|
|
|
</kafka>
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 虚拟列
|
|
|
|
|
|
|
|
|
|
- `_topic` – Kafka 主题。
|
|
|
|
|
- `_key` – 信息的键。
|
|
|
|
|
- `_offset` – 消息的偏移量。
|
2022-02-19 15:49:37 +00:00
|
|
|
|
- `_timestamp` – 消息的时间戳。
|
|
|
|
|
- `_timestamp_ms` – 消息的时间戳(毫秒)。
|
|
|
|
|
- `_partition` – Kafka 主题的分区。
|
2022-01-10 09:38:29 +00:00
|
|
|
|
|
|
|
|
|
**另请参阅**
|
|
|
|
|
|
|
|
|
|
- [虚拟列](../../../engines/table-engines/index.md#table_engines-virtual_columns)
|
|
|
|
|
- [后台消息代理调度池大小](../../../operations/settings/settings.md#background_message_broker_schedule_pool_size)
|