* split up select.md * array-join.md basic refactoring * distinct.md basic refactoring * format.md basic refactoring * from.md basic refactoring * group-by.md basic refactoring * having.md basic refactoring * additional index.md refactoring * into-outfile.md basic refactoring * join.md basic refactoring * limit.md basic refactoring * limit-by.md basic refactoring * order-by.md basic refactoring * prewhere.md basic refactoring * adjust operators/index.md links * adjust sample.md links * adjust more links * adjust operatots links * fix some links * adjust aggregate function article titles * basic refactor of remaining select clauses * absolute paths in make_links.sh * run make_links.sh * remove old select.md locations * translate docs/es * translate docs/fr * translate docs/fa * remove old operators.md location * change operators.md links * adjust links in docs/es * adjust links in docs/es * minor texts adjustments * wip * update machine translations to use new links * fix changelog * es build fixes * get rid of some select.md links * temporary adjust ru links * temporary adjust more ru links * improve curly brace handling * adjust ru as well * fa build fix * ru link fixes * zh link fixes * temporary disable part of anchor checks
8.2 KiB
machine_translated | machine_translated_rev | toc_priority | toc_title |
---|---|---|---|
true | 72537a2d52 |
32 | Kafka |
Kafka
Bu motor ile çalışır Apache Kafka.
Kafka sağlar:
- Veri akışlarını yayınlayın veya abone olun.
- Hataya dayanıklı depolama düzenlemek.
- Kullanılabilir hale geldikçe akışları işleyin.
Tablo oluşturma
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0]
Gerekli parametreler:
kafka_broker_list
– A comma-separated list of brokers (for example,localhost:9092
).kafka_topic_list
– A list of Kafka topics.kafka_group_name
– A group of Kafka consumers. Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.kafka_format
– Message format. Uses the same notation as the SQLFORMAT
fonksiyon gibiJSONEachRow
. Daha fazla bilgi için, bkz: Biçimliler bölme.
İsteğe bağlı parametreler:
kafka_row_delimiter
– Delimiter character, which ends the message.kafka_schema
– Parameter that must be used if the format requires a schema definition. For example, Cap'n Proto şema dosyasının yolunu ve kök adını gerektirirschema.capnp:Message
nesne.kafka_num_consumers
– The number of consumers per table. Default:1
. Bir tüketicinin verimi yetersizse daha fazla tüketici belirtin. Bölüm başına yalnızca bir tüketici atanabileceğinden, toplam tüketici sayısı konudaki bölüm sayısını geçmemelidir.kafka_max_block_size
- Anket için maksimum toplu iş boyutu (mesajlarda) (varsayılan:max_block_size
).kafka_skip_broken_messages
– Kafka message parser tolerance to schema-incompatible messages per block. Default:0
. Eğerkafka_skip_broken_messages = N
sonra motor atlar N Ayrıştırılamayan Kafka iletileri (bir ileti bir veri satırına eşittir).kafka_commit_every_batch
- Bütün bir blok yazdıktan sonra tek bir taahhüt yerine tüketilen ve işlenen her toplu işlemi gerçekleştirin (varsayılan:0
).
Örnekler:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
Bir tablo oluşturmak için kullanımdan kaldırılan yöntem
!!! attention "Dikkat" Bu yöntemi yeni projelerde kullanmayın. Mümkünse, eski projeleri yukarıda açıklanan yönteme geçin.
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
Açıklama
Teslim edilen mesajlar otomatik olarak izlenir, bu nedenle bir gruptaki her mesaj yalnızca bir kez sayılır. Verileri iki kez almak istiyorsanız, tablonun başka bir grup adıyla bir kopyasını oluşturun.
Gruplar esnek ve kümede senkronize edilir. Örneğin, bir kümede 10 konu ve bir tablonun 5 kopyası varsa, her kopya 2 konu alır. Kopya sayısı değişirse, konular kopyalar arasında otomatik olarak yeniden dağıtılır. Bu konuda daha fazla bilgi edinin http://kafka.apache.org/intro.
SELECT
mesajları okumak için özellikle yararlı değildir (hata ayıklama hariç), çünkü her mesaj yalnızca bir kez okunabilir. Hayata görünümler kullanarak gerçek zamanlı iş parçacıkları oluşturmak daha pratiktir. Bunu yapmak için :
- Bir Kafka tüketici oluşturmak için motoru kullanın ve bir veri akışı düşünün.
- İstenen yapıya sahip bir tablo oluşturun.
- Verileri motordan dönüştüren ve daha önce oluşturulmuş bir tabloya koyan materyalleştirilmiş bir görünüm oluşturun.
Ne zaman MATERIALIZED VIEW
motora katılır, arka planda veri toplamaya başlar. Bu, kafka'dan sürekli olarak mesaj almanızı ve bunları kullanarak gerekli biçime dönüştürmenizi sağlar SELECT
.
Bir kafka tablosu istediğiniz kadar materialized görüşe sahip olabilir, kafka tablosundan doğrudan veri okumazlar, ancak yeni kayıtlar (bloklar halinde) alırlar, bu şekilde farklı ayrıntı seviyesine sahip birkaç tabloya yazabilirsiniz (gruplama-toplama ve olmadan).
Örnek:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
Performansı artırmak için, alınan iletiler bloklar halinde gruplandırılır max_ınsert_block_size. İçinde blok oluş ifma ifdıysa stream_flush_interval_ms milisaniye, veri blok bütünlüğü ne olursa olsun tabloya temizlendi.
Konu verilerini almayı durdurmak veya dönüşüm mantığını değiştirmek için, hayata geçirilmiş görünümü ayırın:
DETACH TABLE consumer;
ATTACH TABLE consumer;
Kullanarak hedef tabloyu değiştirmek istiyorsanız ALTER
hedef tablo ile görünümdeki veriler arasındaki tutarsızlıkları önlemek için malzeme görünümünü devre dışı bırakmanızı öneririz.
Yapılandırma
GraphiteMergeTree benzer şekilde, Kafka motoru ClickHouse yapılandırma dosyasını kullanarak genişletilmiş yapılandırmayı destekler. Kullanabileceğiniz iki yapılandırma anahtarı vardır: global (kafka
) ve konu düzeyinde (kafka_*
). Genel yapılandırma önce uygulanır ve sonra konu düzeyinde yapılandırma uygulanır (varsa).
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
Olası yapılandırma seçeneklerinin listesi için bkz. librdkafka yapılandırma referansı. Alt çizgiyi kullan (_
) ClickHouse yapılandırmasında bir nokta yerine. Mesela, check.crcs=true
olacak <check_crcs>true</check_crcs>
.
Sanal Sütunlar
_topic
— Kafka topic._key
— Key of the message._offset
— Offset of the message._timestamp
— Timestamp of the message._partition
— Partition of Kafka topic.
Ayrıca Bakınız