diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index c73876fdebe..4a0550275ca 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -59,6 +59,21 @@ Optional parameters: - `rabbitmq_max_block_size` - `rabbitmq_flush_interval_ms` +Also format settings can be added along with rabbitmq-related settings. + +Example: + +``` sql + CREATE TABLE queue ( + key UInt64, + value UInt64, + date DateTime + ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672', + rabbitmq_exchange_name = 'exchange1', + rabbitmq_format = 'JSONEachRow', + rabbitmq_num_consumers = 5, + date_time_input_format = 'best_effort'; +``` The RabbitMQ server configuration should be added using the ClickHouse config file. @@ -79,18 +94,6 @@ Additional configuration: ``` -Example: - -``` sql - CREATE TABLE queue ( - key UInt64, - value UInt64 - ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672', - rabbitmq_exchange_name = 'exchange1', - rabbitmq_format = 'JSONEachRow', - rabbitmq_num_consumers = 5; -``` - ## Description {#description} `SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this: @@ -114,6 +117,7 @@ Exchange type options: - `consistent_hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`. Setting `rabbitmq_queue_base` may be used for the following cases: + - to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same. - to be able to restore reading from certain durable queues when not all messages were successfully consumed. To resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. - to reuse queues as they are declared durable and not auto-deleted. (Can be deleted via any of RabbitMQ CLI tools.) diff --git a/docs/ru/engines/table-engines/integrations/rabbitmq.md b/docs/ru/engines/table-engines/integrations/rabbitmq.md index 2a44e085ede..f55163c1988 100644 --- a/docs/ru/engines/table-engines/integrations/rabbitmq.md +++ b/docs/ru/engines/table-engines/integrations/rabbitmq.md @@ -52,6 +52,21 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] - `rabbitmq_max_block_size` - `rabbitmq_flush_interval_ms` +Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек. + +Example: + +``` sql + CREATE TABLE queue ( + key UInt64, + value UInt64, + date DateTime + ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672', + rabbitmq_exchange_name = 'exchange1', + rabbitmq_format = 'JSONEachRow', + rabbitmq_num_consumers = 5, + date_time_input_format = 'best_effort'; +``` Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла ClickHouse. @@ -72,18 +87,6 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ``` -Example: - -``` sql - CREATE TABLE queue ( - key UInt64, - value UInt64 - ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672', - rabbitmq_exchange_name = 'exchange1', - rabbitmq_format = 'JSONEachRow', - rabbitmq_num_consumers = 5; -``` - ## Описание {#description} Запрос `SELECT` не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью [материализованных преставлений](../../../sql-reference/statements/create/view.md). Для этого: @@ -107,6 +110,7 @@ Example: - `consistent_hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`. Настройка `rabbitmq_queue_base` может быть использована в следующих случаях: + 1. чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в `rabbitmq_queue_base` настройку и не указывать настройки `rabbitmq_num_consumers` и `rabbitmq_num_queues`. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. По умолчанию, если настройка `rabbitmq_queue_base` не указана, будут использованы уникальные для каждой таблицы имена очередей. 2. чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: `rabbitmq_num_consumers`, `rabbitmq_num_queues`. 3. чтобы повторно использовать созданные c `durable` настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI). diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 2f8d6adfa16..66348d61424 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -1,13 +1,14 @@ #pragma once #include +#include namespace DB { class ASTStorage; -#define LIST_OF_RABBITMQ_SETTINGS(M) \ +#define RABBITMQ_RELATED_SETTINGS(M) \ M(String, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \ M(String, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \ M(String, rabbitmq_format, "", "The message format.", 0) \ @@ -24,6 +25,10 @@ namespace DB M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \ M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \ +#define LIST_OF_RABBITMQ_SETTINGS(M) \ + RABBITMQ_RELATED_SETTINGS(M) \ + FORMAT_FACTORY_SETTINGS(M) + DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) struct RabbitMQSettings : public BaseSettings diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index d14f11c4a29..48305ab1b61 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -200,6 +200,15 @@ std::shared_ptr StorageRabbitMQ::addSettings(const Context & context) c if (!schema_name.empty()) modified_context->setSetting("format_schema", schema_name); + for (const auto & setting : *rabbitmq_settings) + { + const auto & setting_name = setting.getName(); + + /// check for non-rabbitmq-related settings + if (!setting_name.starts_with("rabbitmq_")) + modified_context->setSetting(setting_name, setting.getValue()); + } + return modified_context; } diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 911f6d144f9..ca89ebdea0a 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1912,6 +1912,59 @@ def test_rabbitmq_no_connection_at_startup(rabbitmq_cluster): assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result) +@pytest.mark.timeout(120) +def test_rabbitmq_format_factory_settings(rabbitmq_cluster): + instance.query(''' + CREATE TABLE test.format_settings ( + id String, date DateTime + ) ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'format_settings', + rabbitmq_format = 'JSONEachRow', + date_time_input_format = 'best_effort'; + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + message = json.dumps({"id":"format_settings_test","date":"2021-01-19T14:42:33.1829214Z"}) + expected = instance.query('''SELECT parseDateTimeBestEffort(CAST('2021-01-19T14:42:33.1829214Z', 'String'))''') + + channel.basic_publish(exchange='format_settings', routing_key='', body=message) + result = '' + while True: + result = instance.query('SELECT date FROM test.format_settings') + if result == expected: + break; + + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.view ( + id String, date DateTime + ) ENGINE = MergeTree ORDER BY id; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.format_settings; + ''') + + channel.basic_publish(exchange='format_settings', routing_key='', body=message) + result = '' + while True: + result = instance.query('SELECT date FROM test.view') + if result == expected: + break; + + connection.close() + instance.query(''' + DROP TABLE test.consumer; + DROP TABLE test.format_settings; + ''') + + assert(result == expected) + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")