diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 5d04dce4c51..3da8c651e8d 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -28,7 +28,6 @@ SETTINGS kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] - [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_max_block_size = 0,] @@ -53,7 +52,6 @@ Required parameters: Optional parameters: -- `kafka_row_delimiter` — Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.** - `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `kafka_num_consumers` — The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed. Default: `1`. - `kafka_max_block_size` — The maximum batch size (in messages) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size). @@ -64,7 +62,7 @@ Optional parameters: - `kafka_poll_max_batch_size` — Maximum amount of messages to be polled in a single Kafka poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size). - `kafka_flush_interval_ms` — Timeout for flushing data from Kafka. Default: [stream_flush_interval_ms](../../../operations/settings/settings.md#stream-flush-interval-ms). - `kafka_thread_per_consumer` — Provide independent thread for each consumer. When enabled, every consumer flush the data independently, in parallel (otherwise — rows from several consumers squashed to form one block). Default: `0`. -- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default, stream. +- `kafka_handle_error_mode` — How to handle errors for Kafka engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). - `kafka_commit_on_select` — Commit messages when select query is made. Default: `false`. - `kafka_max_rows_per_message` — The maximum number of rows written in one kafka message for row-based formats. Default : `1`. @@ -249,6 +247,13 @@ Example: - `_headers.name` — Array of message's headers keys. - `_headers.value` — Array of message's headers values. +Additional virtual columns when `kafka_handle_error_mode='stream'`: + +- `_raw_message` - Raw message that couldn't be parsed successfully. +- `_error` - Exception message happened during failed parsing. + +Note: `_raw_message` and `_error` virtual columns are filled inly in case of exception during parsing, they are always empty when message was parsed successfully. + ## Data formats support {#data-formats-support} Kafka engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse. diff --git a/docs/en/engines/table-engines/integrations/nats.md b/docs/en/engines/table-engines/integrations/nats.md index 570b219e5fa..f21855e170f 100644 --- a/docs/en/engines/table-engines/integrations/nats.md +++ b/docs/en/engines/table-engines/integrations/nats.md @@ -25,7 +25,6 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] nats_url = 'host:port', nats_subjects = 'subject1,subject2,...', nats_format = 'data_format'[,] - [nats_row_delimiter = 'delimiter_symbol',] [nats_schema = '',] [nats_num_consumers = N,] [nats_queue_group = 'group_name',] @@ -40,7 +39,8 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] [nats_password = 'password',] [nats_token = 'clickhouse',] [nats_startup_connect_tries = '5'] - [nats_max_rows_per_message = 1] + [nats_max_rows_per_message = 1,] + [nats_handle_error_mode = 'default'] ``` Required parameters: @@ -51,7 +51,6 @@ Required parameters: Optional parameters: -- `nats_row_delimiter` – Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.** - `nats_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `nats_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. - `nats_queue_group` – Name for queue group of NATS subscribers. Default is the table name. @@ -66,6 +65,7 @@ Optional parameters: - `nats_token` - NATS auth token. - `nats_startup_connect_tries` - Number of connect tries at startup. Default: `5`. - `nats_max_rows_per_message` — The maximum number of rows written in one NATS message for row-based formats. (default : `1`). +- `nats_handle_error_mode` — How to handle errors for RabbitMQ engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). SSL connection: @@ -165,6 +165,14 @@ If you want to change the target table by using `ALTER`, we recommend disabling - `_subject` - NATS message subject. +Additional virtual columns when `kafka_handle_error_mode='stream'`: + +- `_raw_message` - Raw message that couldn't be parsed successfully. +- `_error` - Exception message happened during failed parsing. + +Note: `_raw_message` and `_error` virtual columns are filled inly in case of exception during parsing, they are always empty when message was parsed successfully. + + ## Data formats support {#data-formats-support} NATS engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse. diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index 4f6107764ec..7ac245faac5 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -28,7 +28,6 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] [rabbitmq_exchange_type = 'exchange_type',] [rabbitmq_routing_key_list = 'key1,key2,...',] [rabbitmq_secure = 0,] - [rabbitmq_row_delimiter = 'delimiter_symbol',] [rabbitmq_schema = '',] [rabbitmq_num_consumers = N,] [rabbitmq_num_queues = N,] @@ -45,7 +44,8 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] [rabbitmq_username = '',] [rabbitmq_password = '',] [rabbitmq_commit_on_select = false,] - [rabbitmq_max_rows_per_message = 1] + [rabbitmq_max_rows_per_message = 1,] + [rabbitmq_handle_error_mode = 'default'] ``` Required parameters: @@ -58,7 +58,6 @@ Optional parameters: - `rabbitmq_exchange_type` – The type of RabbitMQ exchange: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. Default: `fanout`. - `rabbitmq_routing_key_list` – A comma-separated list of routing keys. -- `rabbitmq_row_delimiter` – Delimiter character, which ends the message. **This setting is deprecated and is no longer used, not left for compatibility reasons.** - `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `rabbitmq_num_consumers` – The number of consumers per table. Specify more consumers if the throughput of one consumer is insufficient. Default: `1` - `rabbitmq_num_queues` – Total number of queues. Increasing this number can significantly improve performance. Default: `1`. @@ -78,6 +77,7 @@ Optional parameters: - `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`. - `rabbitmq_empty_queue_backoff_start` — A start backoff point to reschedule read if the rabbitmq queue is empty. - `rabbitmq_empty_queue_backoff_end` — An end backoff point to reschedule read if the rabbitmq queue is empty. +- `rabbitmq_handle_error_mode` — How to handle errors for RabbitMQ engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). @@ -191,6 +191,13 @@ Example: - `_message_id` - messageID of the received message; non-empty if was set, when message was published. - `_timestamp` - timestamp of the received message; non-empty if was set, when message was published. +Additional virtual columns when `kafka_handle_error_mode='stream'`: + +- `_raw_message` - Raw message that couldn't be parsed successfully. +- `_error` - Exception message happened during failed parsing. + +Note: `_raw_message` and `_error` virtual columns are filled inly in case of exception during parsing, they are always empty when message was parsed successfully. + ## Data formats support {#data-formats-support} RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse. diff --git a/docs/en/engines/table-engines/special/filelog.md b/docs/en/engines/table-engines/special/filelog.md new file mode 100644 index 00000000000..a67d7d783f3 --- /dev/null +++ b/docs/en/engines/table-engines/special/filelog.md @@ -0,0 +1,105 @@ +--- +slug: /en/engines/table-engines/special/filelog +sidebar_position: 160 +sidebar_label: FileLog +--- + +# FileLog Engine {#filelog-engine} + +This engine allows to process application log files as a stream of records. + +`FileLog` lets you: + +- Subscribe to log files. +- Process new records as they are appended to subscribed log files. + +## Creating a Table {#creating-a-table} + +``` sql +CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] +( + name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], + name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], + ... +) ENGINE = FileLog('path_to_logs', 'format_name') SETTINGS + [poll_timeout_ms = 0,] + [poll_max_batch_size = 0,] + [max_block_size = 0,] + [max_threads = 0,] + [poll_directory_watch_events_backoff_init = 500,] + [poll_directory_watch_events_backoff_max = 32000,] + [poll_directory_watch_events_backoff_factor = 2,] + [handle_error_mode = 'default'] +``` + +Engine arguments: + +- `path_to_logs` – Path to log files to subscribe. It can be path to a directory with log files or to a single log file. Note that ClickHouse allows only paths inside `user_files` directory. +- `format_name` - Record format. Note that FileLog process each line in a file as a separate record and not all data formats are suitable for it. + +Optional parameters: + +- `poll_timeout_ms` - Timeout for single poll from log file. Default: [stream_poll_timeout_ms](../../../operations/settings/settings.md#stream_poll_timeout_ms). +- `poll_max_batch_size` — Maximum amount of records to be polled in a single poll. Default: [max_block_size](../../../operations/settings/settings.md#setting-max_block_size). +- `max_block_size` — The maximum batch size (in records) for poll. Default: [max_insert_block_size](../../../operations/settings/settings.md#setting-max_insert_block_size). +- `max_threads` - Number of max threads to parse files, default is 0, which means the number will be max(1, physical_cpu_cores / 4). +- `poll_directory_watch_events_backoff_init` - The initial sleep value for watch directory thread. Default: `500`. +- `poll_directory_watch_events_backoff_max` - The max sleep value for watch directory thread. Default: `32000`. +- `poll_directory_watch_events_backoff_factor` - The speed of backoff, exponential by default. Default: `2`. +- `handle_error_mode` — How to handle errors for FileLog engine. Possible values: default (the exception will be thrown if we fail to parse a message), stream (the exception message and raw message will be saved in virtual columns `_error` and `_raw_message`). + +## Description {#description} + +The delivered records are tracked automatically, so each record in a log file is only counted once. + +`SELECT` is not particularly useful for reading records (except for debugging), because each record can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this: + +1. Use the engine to create a FileLog table and consider it a data stream. +2. Create a table with the desired structure. +3. Create a materialized view that converts data from the engine and puts it into a previously created table. + +When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive records from log files and convert them to the required format using `SELECT`. +One FileLog table can have as many materialized views as you like, they do not read data from the table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without). + +Example: + +``` sql + CREATE TABLE logs ( + timestamp UInt64, + level String, + message String + ) ENGINE = FileLog('user_files/my_app/app.log', 'JSONEachRow'); + + CREATE TABLE daily ( + day Date, + level String, + total UInt64 + ) ENGINE = SummingMergeTree(day, (day, level), 8192); + + CREATE MATERIALIZED VIEW consumer TO daily + AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total + FROM queue GROUP BY day, level; + + SELECT level, sum(total) FROM daily GROUP BY level; +``` + +To stop receiving streams data or to change the conversion logic, detach the materialized view: + +``` sql + DETACH TABLE consumer; + ATTACH TABLE consumer; +``` + +If you want to change the target table by using `ALTER`, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view. + +## Virtual Columns {#virtual-columns} + +- `_filename` - Name of the log file. +- `_offset` - Offset in the log file. + +Additional virtual columns when `kafka_handle_error_mode='stream'`: + +- `_raw_record` - Raw record that couldn't be parsed successfully. +- `_error` - Exception message happened during failed parsing. + +Note: `_raw_record` and `_error` virtual columns are filled inly in case of exception during parsing, they are always empty when message was parsed successfully. diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index aa9d8e6e8fe..65727863c34 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -3505,7 +3505,9 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): assert errors_count == num_rows / 2 - broken_messages = instance.query("SELECT broken_message FROM test.errors order by broken_message") + broken_messages = instance.query( + "SELECT broken_message FROM test.errors order by broken_message" + ) expected = [] for i in range(num_rows): if i % 2 != 0: