* Additional .gitignore entries * Merge a bunch of small articles about system tables into single one * Merge a bunch of small articles about formats into single one * Adapt table with formats to English docs too * Add SPb meetup link to main page * Move Utilities out of top level of docs (the location is probably not yet final) + translate couple articles * Merge MacOS.md into build_osx.md * Move Data types higher in ToC * Publish changelog on website alongside documentation * Few fixes for en/table_engines/file.md * Use smaller header sizes in changelogs * Group up table engines inside ToC * Move table engines out of top level too * Specificy in ToC that query language is SQL based. Thats a bit excessive, but catches eye. * Move stuff that is part of query language into respective folder * Move table functions lower in ToC * Lost redirects.txt update * Do not rely on comments in yaml + fix few ru titles * Extract major parts of queries.md into separate articles * queries.md has been supposed to be removed * Fix weird translation * Fix a bunch of links * There is only table of contents left * "Query language" is actually part of SQL abbreviation * Change filename in README.md too * fix mistype
4.9 KiB
Kafka
This engine works with Apache Kafka.
Kafka lets you:
- Publish or subscribe to data flows.
- Organize fault-tolerant storage.
- Process streams as they become available.
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Parameters:
broker_list
– A comma-separated list of brokers (localhost:9092
).topic_list
– A list of Kafka topics (my_topic
).group_name
– A group of Kafka consumers (group1
). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.--format
– Message format. Uses the same notation as the SQLFORMAT
function, such asJSONEachRow
. For more information, see the "Formats" section.schema
– An optional parameter that must be used if the format requires a schema definition. For example, Cap'n Proto requires the path to the schema file and the name of the rootschema.capnp:Message
object.num_consumers
– The number of consumers per table. Default:1
. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
Example:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
Groups are flexible and synced on the cluster. For instance, if you have 10 topics and 5 copies of a table in a cluster, then each copy gets 2 topics. If the number of copies changes, the topics are redistributed across the copies automatically. Read more about this at http://kafka.apache.org/intro.
SELECT
is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using materialized views. To do this:
- Use the engine to create a Kafka consumer and consider it a data stream.
- Create a table with the desired structure.
- Create a materialized view that converts data from the engine and puts it into a previously created table.
When the MATERIALIZED VIEW
joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using SELECT
Example:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
To improve performance, received messages are grouped into blocks the size of max_insert_block_size. If the block wasn't formed within stream_flush_interval_ms milliseconds, the data will be flushed to the table regardless of the completeness of the block.
To stop receiving topic data or to change the conversion logic, detach the materialized view:
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
If you want to change the target table by using ALTER
materialized view, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.
Configuration
Similar to GraphiteMergeTree, the Kafka engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (kafka
) and topic-level (kafka_topic_*
). The global configuration is applied first, and the topic-level configuration is second (if it exists).
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_topic_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic_logs>
For a list of possible configuration options, see the librdkafka configuration reference. Use the underscore (_
) instead of a dot in the ClickHouse configuration. For example, check.crcs=true
will be <check_crcs>true</check_crcs>
.