ClickHouse/docs/fr/engines/table-engines/integrations/kafka.md
Ivan Blinkov d91c97d15d
[docs] replace underscores with hyphens (#10606)
* Replace underscores with hyphens

* remove temporary code

* fix style check

* fix collapse
2020-04-30 21:19:18 +03:00

8.3 KiB
Raw Blame History

machine_translated machine_translated_rev toc_priority toc_title
true f865c9653f 32 Kafka

Kafka

Ce moteur fonctionne avec Apache Kafka.

Kafka vous permet de:

  • Publier ou sabonner aux flux de données.
  • Organiser le stockage tolérant aux pannes.
  • Traiter les flux à mesure quils deviennent disponibles.

Création dune Table

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_skip_broken_messages = N]

Les paramètres requis:

  • kafka_broker_list A comma-separated list of brokers (for example, localhost:9092).
  • kafka_topic_list A list of Kafka topics.
  • kafka_group_name A group of Kafka consumers. Reading margins are tracked for each group separately. If you dont want messages to be duplicated in the cluster, use the same group name everywhere.
  • kafka_format Message format. Uses the same notation as the SQL FORMAT la fonction, tels que JSONEachRow. Pour plus dinformations, voir le Format section.

Paramètres facultatifs:

  • kafka_row_delimiter Delimiter character, which ends the message.
  • kafka_schema Parameter that must be used if the format requires a schema definition. For example, Capn Proto nécessite le chemin daccès du fichier de schéma et le nom de la racine schema.capnp:Message objet.
  • kafka_num_consumers The number of consumers per table. Default: 1. Spécifiez plus de consommateurs si le débit dun consommateur est insuffisant. Le nombre total de consommateurs ne doit pas dépasser le nombre de partitions dans la rubrique, car un seul consommateur peut être affecté par partition.
  • kafka_skip_broken_messages Kafka message parser tolerance to schema-incompatible messages per block. Default: 0. Si kafka_skip_broken_messages = N puis le moteur saute N Messages Kafka qui ne peuvent pas être analysés (un message est égal à une ligne de données).

Exemple:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Méthode obsolète pour créer une Table

!!! attention "Attention" Nutilisez pas cette méthode dans les nouveaux projets. Si possible, optez anciens projets à la méthode décrite ci-dessus.

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])

Description

Les messages livrés sont suivis automatiquement, de sorte que chaque message dun groupe nest compté quune seule fois. Si vous souhaitez obtenir les données deux fois, créez une copie de la table avec un autre nom de groupe.

Les groupes sont flexibles et synchronisés sur le cluster. Par exemple, si vous avez 10 thèmes et 5 copies dune table dans un cluster, chaque copie obtient 2 sujets. Si le nombre de copies change, les rubriques sont redistribuées automatiquement entre les copies. En savoir plus à ce sujet à http://kafka.apache.org/intro.

SELECT nest pas particulièrement utile pour la lecture de messages (sauf pour le débogage), car chaque message ne peut être lu quune seule fois. Il est plus pratique de créer des threads en temps réel à laide de vues matérialisées. Pour ce faire:

  1. Utilisez le moteur pour créer un consommateur Kafka et considérez-le comme un flux de données.
  2. Créez une table avec la structure souhaitée.
  3. Créer une vue matérialisée qui convertit les données du moteur et le met dans une table créée précédemment.

Lorsque l MATERIALIZED VIEW rejoint le moteur, il commence à collecter des données en arrière-plan. Cela vous permet de recevoir continuellement des messages de Kafka et de les convertir au format requis en utilisant SELECT. Une table kafka peut avoir autant de vues matérialisées que vous le souhaitez, elles ne lisent pas directement les données de la table kafka, mais reçoivent de nouveaux enregistrements( en blocs), de cette façon vous pouvez écrire sur plusieurs tables avec différents niveaux de détail (avec regroupement - agrégation et sans).

Exemple:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

Pour améliorer les performances, les messages reçus sont regroupées en blocs de la taille de max_insert_block_size. Si le bloc na pas été formé à lintérieur stream_flush_interval_ms millisecondes, les données seront vidées dans le tableau, indépendamment de lintégralité du bloc.

Pour arrêter de recevoir des données de rubrique ou pour modifier la logique de conversion, détachez la vue matérialisée:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

Si vous souhaitez modifier la table cible en utilisant ALTER, nous vous recommandons de désactiver la vue matériel pour éviter les divergences entre la table cible et les données de la vue.

Configuration

Similaire à GraphiteMergeTree, le moteur Kafka prend en charge la configuration étendue à laide du fichier de configuration ClickHouse. Il y a deux clés de configuration que vous pouvez utiliser: global (kafka) et des rubriques (kafka_*). La configuration globale est appliquée en premier, puis la configuration au niveau de la rubrique est appliquée (si elle existe).

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

Pour obtenir une liste des options de configuration possibles, consultez librdkafka référence de configuration. Utilisez le trait de soulignement (_) au lieu dun point dans la configuration ClickHouse. Exemple, check.crcs=true sera <check_crcs>true</check_crcs>.

Les Colonnes Virtuelles

  • _topic — Kafka topic.
  • _key — Key of the message.
  • _offset — Offset of the message.
  • _timestamp — Timestamp of the message.
  • _partition — Partition of Kafka topic.

Voir Aussi

Article Original