diff --git a/src/Core/Settings.h b/src/Core/Settings.h index eda76584f0b..9cd6287a75d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -71,6 +71,7 @@ struct Settings : public SettingsCollection M(SettingMilliseconds, connection_pool_max_wait_ms, 0, "The wait time when the connection pool is full.", 0) \ M(SettingMilliseconds, replace_running_query_max_wait_ms, 5000, "The wait time for running query with the same query_id to finish when setting 'replace_running_query' is active.", 0) \ M(SettingMilliseconds, kafka_max_wait_ms, 5000, "The wait time for reading from Kafka before retry.", 0) \ + M(SettingMilliseconds, rabbitmq_max_wait_ms, 5000, "The wait time for reading from RabbitMQ before retry.", 0) \ M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.", 0) \ M(SettingUInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.", 0) \ M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \ diff --git a/src/Core/config_core.h.in b/src/Core/config_core.h.in index 620c23c21cc..5991c12a1f2 100644 --- a/src/Core/config_core.h.in +++ b/src/Core/config_core.h.in @@ -5,6 +5,7 @@ #cmakedefine01 USE_ICU #cmakedefine01 USE_MYSQL #cmakedefine01 USE_RDKAFKA +#cmakedefine01 USE_AMQPCPP #cmakedefine01 USE_EMBEDDED_COMPILER #cmakedefine01 USE_INTERNAL_LLVM_LIBRARY #cmakedefine01 USE_SSL diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.cpp b/src/Storages/RabbitMQ/RabbitMQSettings.cpp new file mode 100644 index 00000000000..ed8d4ad801a --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQSettings.cpp @@ -0,0 +1,44 @@ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_COLLECTION(RabbitMQSettings, LIST_OF_RABBITMQ_SETTINGS) + +void RabbitMQSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + throw Exception(e.message() + " for storage " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS); + else + e.rethrow(); + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} +} + diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h new file mode 100644 index 00000000000..0b0f58169fa --- /dev/null +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -0,0 +1,26 @@ +#pragma once +#include + +namespace DB +{ + class ASTStorage; + + struct RabbitMQSettings : public SettingsCollection + { + +#define LIST_OF_RABBITMQ_SETTINGS(M) \ + M(SettingString, rabbitmq_host_port, "", "A host-port to connect to RabbitMQ server.", 0) \ + M(SettingString, rabbitmq_routing_key, "5672", "A routing key to connect producer->exchange->queue<->consumer.", 0) \ + M(SettingString, rabbitmq_exchange_name, "clickhouse-exchange", "The exhange name, to which messages are sent. Needed to bind queues to it.", 0) \ + M(SettingString, rabbitmq_format, "", "The message format.", 0) \ + M(SettingChar, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \ + M(SettingUInt64, rabbitmq_bind_by_id, 0, "A flag which indicates that binding should be done in range [0, num_consumers).", 0) \ + M(SettingUInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \ + M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ + M(SettingUInt64, rabbitmq_hash_exchange, 0, "A flag which indicates whether consistent-hash-exchange should be used.", 0) \ + + DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS) + + void loadFromQuery(ASTStorage & storage_def); + }; +} diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp new file mode 100644 index 00000000000..98e7e97e2e1 --- /dev/null +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -0,0 +1,249 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +StorageRabbitMQ::StorageRabbitMQ( + const StorageID & table_id_, + Context & context_, + const ColumnsDescription & columns_, + const String & host_port_, + const String & routing_key_, + const String & exchange_name_, + const String & format_name_, + char row_delimiter_, + size_t num_consumers_, + bool bind_by_id_, + size_t num_queues_, + bool hash_exchange_) + : IStorage(table_id_) + , global_context(context_.getGlobalContext()) + , rabbitmq_context(Context(global_context)) + , routing_key(global_context.getMacros()->expand(routing_key_)) + , exchange_name(exchange_name_) + , format_name(global_context.getMacros()->expand(format_name_)) + , row_delimiter(row_delimiter_) + , num_consumers(num_consumers_) + , bind_by_id(bind_by_id_) + , num_queues(num_queues_) + , hash_exchange(hash_exchange_) + , log(&Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) + , semaphore(0, num_consumers_) + , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) +{ +} + + +void registerStorageRabbitMQ(StorageFactory & factory) +{ + auto creator_fn = [](const StorageFactory::Arguments & args) + { + ASTs & engine_args = args.engine_args; + size_t args_count = engine_args.size(); + bool has_settings = args.storage_def->settings; + + RabbitMQSettings rabbitmq_settings; + if (has_settings) + { + rabbitmq_settings.loadFromQuery(*args.storage_def); + } + + String host_port = rabbitmq_settings.rabbitmq_host_port; + if (args_count >= 1) + { + const auto * ast = engine_args[0]->as(); + if (ast && ast->value.getType() == Field::Types::String) + { + host_port = safeGet(ast->value); + } + else + { + throw Exception(String("RabbitMQ host:port must be a string"), ErrorCodes::BAD_ARGUMENTS); + } + } + + String routing_key = rabbitmq_settings.rabbitmq_routing_key.value; + if (args_count >= 2) + { + const auto * ast = engine_args[1]->as(); + if (ast && ast->value.getType() == Field::Types::String) + { + routing_key = safeGet(ast->value); + } + else + { + throw Exception(String("RabbitMQ routing key must be a string"), ErrorCodes::BAD_ARGUMENTS); + } + } + + String exchange = rabbitmq_settings.rabbitmq_exchange_name.value; + if (args_count >= 3) + { + engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); + + const auto * ast = engine_args[2]->as(); + if (ast && ast->value.getType() == Field::Types::String) + { + exchange = safeGet(ast->value); + } + } + + String format = rabbitmq_settings.rabbitmq_format.value; + if (args_count >= 4) + { + engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); + + const auto * ast = engine_args[3]->as(); + if (ast && ast->value.getType() == Field::Types::String) + { + format = safeGet(ast->value); + } + else + { + throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS); + } + } + + char row_delimiter = rabbitmq_settings.rabbitmq_row_delimiter; + if (args_count >= 5) + { + engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); + + const auto * ast = engine_args[4]->as(); + String arg; + if (ast && ast->value.getType() == Field::Types::String) + { + arg = safeGet(ast->value); + } + else + { + throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); + } + if (arg.size() > 1) + { + throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS); + } + else if (arg.empty()) + { + row_delimiter = '\0'; + } + else + { + row_delimiter = arg[0]; + } + } + + size_t bind_by_id = static_cast(rabbitmq_settings.rabbitmq_bind_by_id); + if (args_count >= 6) + { + const auto * ast = engine_args[5]->as(); + if (ast && ast->value.getType() == Field::Types::UInt64) + { + bind_by_id = static_cast(safeGet(ast->value)); + } + else + { + throw Exception("Hash exchange flag must be a boolean", ErrorCodes::BAD_ARGUMENTS); + } + } + + UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers; + if (args_count >= 7) + { + const auto * ast = engine_args[6]->as(); + if (ast && ast->value.getType() == Field::Types::UInt64) + { + num_consumers = safeGet(ast->value); + } + else + { + throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS); + } + } + + UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues; + if (args_count >= 8) + { + const auto * ast = engine_args[7]->as(); + if (ast && ast->value.getType() == Field::Types::UInt64) + { + num_consumers = safeGet(ast->value); + } + else + { + throw Exception("Number of queues must be a positive integer", ErrorCodes::BAD_ARGUMENTS); + } + } + + size_t hash_exchange = static_cast(rabbitmq_settings.rabbitmq_hash_exchange); + if (args_count >= 9) + { + const auto * ast = engine_args[8]->as(); + if (ast && ast->value.getType() == Field::Types::UInt64) + { + hash_exchange = static_cast(safeGet(ast->value)); + } + else + { + throw Exception("Hash exchange flag must be a boolean", ErrorCodes::BAD_ARGUMENTS); + } + } + + return StorageRabbitMQ::create(args.table_id, args.context, args.columns, host_port, routing_key, exchange, + format, row_delimiter, num_consumers, bind_by_id, num_queues, hash_exchange); + }; + + factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); + +} + + +NamesAndTypesList StorageRabbitMQ::getVirtuals() const +{ + return NamesAndTypesList{ + {"_exchange", std::make_shared()}, + {"_routingKey", std::make_shared()} + }; +} + +} + diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h new file mode 100644 index 00000000000..37b8c2b1078 --- /dev/null +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +using ChannelPtr = std::shared_ptr; + +class StorageRabbitMQ final: public ext::shared_ptr_helper, public IStorage +{ + friend struct ext::shared_ptr_helper; + +public: + std::string getName() const override { return "RabbitMQ"; } + bool supportsSettings() const override { return true; } + +protected: + StorageRabbitMQ( + const StorageID & table_id_, + Context & context_, + const ColumnsDescription & columns_, + const String & host_port_, + const String & routing_key_, const String & exchange_name_, + const String & format_name_, char row_delimiter_, + size_t num_consumers_, bool bind_by_id_, size_t num_queues_, bool hash_exchange); + +private: + Context global_context; + Context rabbitmq_context; + + String routing_key; + const String exchange_name; + + const String format_name; + char row_delimiter; + size_t num_consumers; + size_t num_created_consumers = 0; + + bool bind_by_id; + size_t num_queues; + const bool hash_exchange; + + Poco::Logger * log; + std::pair parsed_address; + + Poco::Semaphore semaphore; + std::mutex mutex; + + size_t consumer_id = 0; + + BackgroundSchedulePool::TaskHolder task; + std::atomic stream_cancelled{false}; +}; + +} diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 5ad26b70803..c349a4e5c8f 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -46,6 +46,10 @@ void registerStorages() #if USE_RDKAFKA registerStorageKafka(factory); #endif + + #if USE_AMQPCPP + registerStorageRabbitMQ(factory); + #endif } } diff --git a/src/Storages/registerStorages.h b/src/Storages/registerStorages.h index c9874551073..2823f5c2d2c 100644 --- a/src/Storages/registerStorages.h +++ b/src/Storages/registerStorages.h @@ -47,6 +47,10 @@ void registerStorageMySQL(StorageFactory & factory); void registerStorageKafka(StorageFactory & factory); #endif +#if USE_AMQPCPP +void registerStorageRabbitMQ(StorageFactory & factory); +#endif + void registerStorages(); }