diff --git a/cmake/find/amqpcpp.cmake b/cmake/find/amqpcpp.cmake index a4a58349508..05e5d2da751 100644 --- a/cmake/find/amqpcpp.cmake +++ b/cmake/find/amqpcpp.cmake @@ -17,7 +17,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/CMakeLists.txt") endif () set (USE_AMQPCPP 1) -set (AMQPCPP_LIBRARY amqp-cpp) +set (AMQPCPP_LIBRARY amqp-cpp ${OPENSSL_LIBRARIES}) set (AMQPCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/include") list (APPEND AMQPCPP_INCLUDE_DIR diff --git a/contrib/amqpcpp-cmake/CMakeLists.txt b/contrib/amqpcpp-cmake/CMakeLists.txt index 5637db4cf41..faef7bd4a1c 100644 --- a/contrib/amqpcpp-cmake/CMakeLists.txt +++ b/contrib/amqpcpp-cmake/CMakeLists.txt @@ -41,6 +41,4 @@ target_compile_options (amqp-cpp ) target_include_directories (amqp-cpp SYSTEM PUBLIC "${LIBRARY_DIR}/include") - -target_link_libraries (amqp-cpp PUBLIC ssl) - +target_link_libraries(amqp-cpp PUBLIC ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY}) diff --git a/contrib/boringssl-cmake/CMakeLists.txt b/contrib/boringssl-cmake/CMakeLists.txt index 9d8c6ca6083..4502d6e9d42 100644 --- a/contrib/boringssl-cmake/CMakeLists.txt +++ b/contrib/boringssl-cmake/CMakeLists.txt @@ -15,12 +15,12 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") endif() if(CMAKE_COMPILER_IS_GNUCXX OR CLANG) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fvisibility=hidden -fno-common -fno-exceptions -fno-rtti") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fno-common -fno-exceptions -fno-rtti") if(APPLE AND CLANG) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") endif() - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fvisibility=hidden -fno-common") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-common") if((CMAKE_C_COMPILER_VERSION VERSION_GREATER "4.8.99") OR CLANG) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11") else() diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index 5fb9ce5b151..a3ee1115c00 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -21,11 +21,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = RabbitMQ SETTINGS - rabbitmq_host_port = 'host:port', + rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'], rabbitmq_exchange_name = 'exchange_name', rabbitmq_format = 'data_format'[,] [rabbitmq_exchange_type = 'exchange_type',] [rabbitmq_routing_key_list = 'key1,key2,...',] + [rabbitmq_secure = 0,] [rabbitmq_row_delimiter = 'delimiter_symbol',] [rabbitmq_schema = '',] [rabbitmq_num_consumers = N,] @@ -59,6 +60,11 @@ Optional parameters: - `rabbitmq_max_block_size` - `rabbitmq_flush_interval_ms` +SSL connection: + +Use either `rabbitmq_secure = 1` or `amqps` in connection address: `rabbitmq_address = 'amqps://guest:guest@localhost/vhost'`. +The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future. + Also format settings can be added along with rabbitmq-related settings. Example: diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index ff7c79b89c6..01109dda66a 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -20,6 +20,8 @@ namespace DB M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \ M(Bool, rabbitmq_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \ + M(Bool, rabbitmq_secure, false, "Use SSL connection", 0) \ + M(String, rabbitmq_address, "", "Address for connection", 0) \ M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \ M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \ M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \ diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 419071ba642..ce7e5941b68 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -97,12 +97,16 @@ StorageRabbitMQ::StorageRabbitMQ( getContext()->getConfigRef().getString("rabbitmq.username"), getContext()->getConfigRef().getString("rabbitmq.password"))) , vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value)) + , connection_string(rabbitmq_settings->rabbitmq_address) + , secure(rabbitmq_settings->rabbitmq_secure.value) , semaphore(0, num_consumers) , unique_strbase(getRandomName()) , queue_size(std::max(QUEUE_SIZE, static_cast(getMaxBlockSize()))) , milliseconds_to_wait(RESCHEDULE_MS) { event_handler = std::make_shared(loop.getLoop(), log); + if (secure) + SSL_library_init(); restoreConnection(false); StorageInMemoryMetadata storage_metadata; @@ -528,10 +532,10 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting) LOG_TRACE(log, "Trying to restore connection to " + address); } - connection = std::make_unique(event_handler.get(), - AMQP::Address( - parsed_address.first, parsed_address.second, - AMQP::Login(login_password.first, login_password.second), vhost)); + auto amqp_address = connection_string.empty() ? AMQP::Address(parsed_address.first, parsed_address.second, + AMQP::Login(login_password.first, login_password.second), vhost, secure) + : AMQP::Address(connection_string); + connection = std::make_unique(event_handler.get(), amqp_address); cnt_retries = 0; while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX) @@ -1053,50 +1057,20 @@ void registerStorageRabbitMQ(StorageFactory & factory) { auto creator_fn = [](const StorageFactory::Arguments & args) { - ASTs & engine_args = args.engine_args; - size_t args_count = engine_args.size(); - bool has_settings = args.storage_def->settings; auto rabbitmq_settings = std::make_unique(); - if (has_settings) - rabbitmq_settings->loadFromQuery(*args.storage_def); + if (!args.storage_def->settings) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ engine must have settings"); - // Check arguments and settings - #define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \ - /* One of the three required arguments is not specified */ \ - if (args_count < (ARG_NUM) && (ARG_NUM) <= 2 && !rabbitmq_settings->ARG_NAME.changed) \ - { \ - throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \ - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \ - } \ - if (args_count >= (ARG_NUM)) \ - { \ - if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */ \ - { \ - throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ " \ - "and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS); \ - } \ - } + rabbitmq_settings->loadFromQuery(*args.storage_def); - CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port) - CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_format) - CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_exchange_name) - CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type) - CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list) - CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter) - CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema) - CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers) - CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues) - CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base) - CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_persistent) - CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_skip_broken_messages) - CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size) - CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms) - CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_vhost) - CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_queue_settings_list) - CHECK_RABBITMQ_STORAGE_ARGUMENT(17, rabbitmq_queue_consume) + if (!rabbitmq_settings->rabbitmq_host_port.changed + && !rabbitmq_settings->rabbitmq_address.changed) + throw Exception("You must speicify either `rabbitmq_host_port` or `rabbitmq_address` settings", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - #undef CHECK_RABBITMQ_STORAGE_ARGUMENT + if (!rabbitmq_settings->rabbitmq_format.changed) + throw Exception("You must speicify `rabbitmq_format` setting", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings)); }; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 1a2445f3690..48a907cab2c 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -107,6 +107,8 @@ private: std::pair parsed_address; std::pair login_password; String vhost; + String connection_string; + bool secure; UVLoop loop; std::shared_ptr event_handler;