This commit is contained in:
kssenii 2021-08-30 20:55:23 +03:00
parent ea84b939c2
commit 335b6f12fd
7 changed files with 32 additions and 50 deletions

View File

@ -17,7 +17,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/CMakeLists.txt")
endif ()
set (USE_AMQPCPP 1)
set (AMQPCPP_LIBRARY amqp-cpp)
set (AMQPCPP_LIBRARY amqp-cpp ${OPENSSL_LIBRARIES})
set (AMQPCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP/include")
list (APPEND AMQPCPP_INCLUDE_DIR

View File

@ -41,6 +41,4 @@ target_compile_options (amqp-cpp
)
target_include_directories (amqp-cpp SYSTEM PUBLIC "${LIBRARY_DIR}/include")
target_link_libraries (amqp-cpp PUBLIC ssl)
target_link_libraries(amqp-cpp PUBLIC ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -15,12 +15,12 @@ if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fvisibility=hidden -fno-common -fno-exceptions -fno-rtti")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fno-common -fno-exceptions -fno-rtti")
if(APPLE AND CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fvisibility=hidden -fno-common")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-common")
if((CMAKE_C_COMPILER_VERSION VERSION_GREATER "4.8.99") OR CLANG)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11")
else()

View File

@ -21,11 +21,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port',
rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
rabbitmq_exchange_name = 'exchange_name',
rabbitmq_format = 'data_format'[,]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_secure = 0,]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
@ -59,6 +60,11 @@ Optional parameters:
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
SSL connection:
Use either `rabbitmq_secure = 1` or `amqps` in connection address: `rabbitmq_address = 'amqps://guest:guest@localhost/vhost'`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.
Also format settings can be added along with rabbitmq-related settings.
Example:

View File

@ -20,6 +20,8 @@ namespace DB
M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(String, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(Bool, rabbitmq_persistent, false, "For insert query messages will be made 'persistent', durable.", 0) \
M(Bool, rabbitmq_secure, false, "Use SSL connection", 0) \
M(String, rabbitmq_address, "", "Address for connection", 0) \
M(UInt64, rabbitmq_skip_broken_messages, 0, "Skip at least this number of broken messages from RabbitMQ per block", 0) \
M(UInt64, rabbitmq_max_block_size, 0, "Number of row collected before flushing data from RabbitMQ.", 0) \
M(Milliseconds, rabbitmq_flush_interval_ms, 0, "Timeout for flushing data from RabbitMQ.", 0) \

View File

@ -97,12 +97,16 @@ StorageRabbitMQ::StorageRabbitMQ(
getContext()->getConfigRef().getString("rabbitmq.username"),
getContext()->getConfigRef().getString("rabbitmq.password")))
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
, connection_string(rabbitmq_settings->rabbitmq_address)
, secure(rabbitmq_settings->rabbitmq_secure.value)
, semaphore(0, num_consumers)
, unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(RESCHEDULE_MS)
{
event_handler = std::make_shared<RabbitMQHandler>(loop.getLoop(), log);
if (secure)
SSL_library_init();
restoreConnection(false);
StorageInMemoryMetadata storage_metadata;
@ -528,10 +532,10 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
LOG_TRACE(log, "Trying to restore connection to " + address);
}
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(
parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), vhost));
auto amqp_address = connection_string.empty() ? AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), vhost, secure)
: AMQP::Address(connection_string);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), amqp_address);
cnt_retries = 0;
while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX)
@ -1053,50 +1057,20 @@ void registerStorageRabbitMQ(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
if (has_settings)
rabbitmq_settings->loadFromQuery(*args.storage_def);
if (!args.storage_def->settings)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ engine must have settings");
// Check arguments and settings
#define CHECK_RABBITMQ_STORAGE_ARGUMENT(ARG_NUM, ARG_NAME) \
/* One of the three required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 2 && !rabbitmq_settings->ARG_NAME.changed) \
{ \
throw Exception("Required parameter '" #ARG_NAME "' for storage RabbitMQ not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
if (args_count >= (ARG_NUM)) \
{ \
if (rabbitmq_settings->ARG_NAME.changed) /* The same argument is given in two places */ \
{ \
throw Exception("The argument №" #ARG_NUM " of storage RabbitMQ " \
"and the parameter '" #ARG_NAME "' is duplicated", ErrorCodes::BAD_ARGUMENTS); \
} \
}
rabbitmq_settings->loadFromQuery(*args.storage_def);
CHECK_RABBITMQ_STORAGE_ARGUMENT(1, rabbitmq_host_port)
CHECK_RABBITMQ_STORAGE_ARGUMENT(2, rabbitmq_format)
CHECK_RABBITMQ_STORAGE_ARGUMENT(3, rabbitmq_exchange_name)
CHECK_RABBITMQ_STORAGE_ARGUMENT(4, rabbitmq_exchange_type)
CHECK_RABBITMQ_STORAGE_ARGUMENT(5, rabbitmq_routing_key_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(6, rabbitmq_row_delimiter)
CHECK_RABBITMQ_STORAGE_ARGUMENT(7, rabbitmq_schema)
CHECK_RABBITMQ_STORAGE_ARGUMENT(8, rabbitmq_num_consumers)
CHECK_RABBITMQ_STORAGE_ARGUMENT(9, rabbitmq_num_queues)
CHECK_RABBITMQ_STORAGE_ARGUMENT(10, rabbitmq_queue_base)
CHECK_RABBITMQ_STORAGE_ARGUMENT(11, rabbitmq_persistent)
CHECK_RABBITMQ_STORAGE_ARGUMENT(12, rabbitmq_skip_broken_messages)
CHECK_RABBITMQ_STORAGE_ARGUMENT(13, rabbitmq_max_block_size)
CHECK_RABBITMQ_STORAGE_ARGUMENT(14, rabbitmq_flush_interval_ms)
CHECK_RABBITMQ_STORAGE_ARGUMENT(15, rabbitmq_vhost)
CHECK_RABBITMQ_STORAGE_ARGUMENT(16, rabbitmq_queue_settings_list)
CHECK_RABBITMQ_STORAGE_ARGUMENT(17, rabbitmq_queue_consume)
if (!rabbitmq_settings->rabbitmq_host_port.changed
&& !rabbitmq_settings->rabbitmq_address.changed)
throw Exception("You must speicify either `rabbitmq_host_port` or `rabbitmq_address` settings",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
#undef CHECK_RABBITMQ_STORAGE_ARGUMENT
if (!rabbitmq_settings->rabbitmq_format.changed)
throw Exception("You must speicify `rabbitmq_format` setting", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageRabbitMQ::create(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings));
};

View File

@ -107,6 +107,8 @@ private:
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
String vhost;
String connection_string;
bool secure;
UVLoop loop;
std::shared_ptr<RabbitMQHandler> event_handler;