diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 58f08c48c68..6a94b1a28dc 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -93,18 +93,40 @@ StorageRabbitMQ::StorageRabbitMQ( , milliseconds_to_wait(RESCHEDULE_MS) , is_attach(is_attach_) { - auto parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672); - context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second)); + const auto & config = getContext()->getConfigRef(); + + std::pair parsed_address; + auto setting_rabbitmq_username = rabbitmq_settings->rabbitmq_username.value; + auto setting_rabbitmq_password = rabbitmq_settings->rabbitmq_password.value; + String username, password; + + if (rabbitmq_settings->rabbitmq_host_port.changed) + { + username = setting_rabbitmq_username.empty() ? config.getString("rabbitmq.username", "") : setting_rabbitmq_username; + password = setting_rabbitmq_password.empty() ? config.getString("rabbitmq.password", "") : setting_rabbitmq_password; + if (username.empty() || password.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "No username or password. They can be specified either in config or in storage settings"); + + parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672); + if (parsed_address.first.empty()) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Host or port is incorrect (host: {}, port: {})", parsed_address.first, parsed_address.second); + + context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second)); + } + else if (!rabbitmq_settings->rabbitmq_address.changed) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ requires either `rabbitmq_host_port` or `rabbitmq_address` setting"); - auto rabbitmq_username = rabbitmq_settings->rabbitmq_username.value; - auto rabbitmq_password = rabbitmq_settings->rabbitmq_password.value; configuration = { .host = parsed_address.first, .port = parsed_address.second, - .username = rabbitmq_username.empty() ? getContext()->getConfigRef().getString("rabbitmq.username") : rabbitmq_username, - .password = rabbitmq_password.empty() ? getContext()->getConfigRef().getString("rabbitmq.password") : rabbitmq_password, - .vhost = getContext()->getConfigRef().getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)), + .username = username, + .password = password, + .vhost = config.getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)), .secure = rabbitmq_settings->rabbitmq_secure.value, .connection_string = getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_address) }; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index c1bd136126f..f497a5bb2c0 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -30,6 +30,11 @@ instance = cluster.add_instance( stay_alive=True, ) +instance2 = cluster.add_instance( + "instance2", + user_configs=["configs/users.xml"], + with_rabbitmq=True, +) # Helpers @@ -2745,6 +2750,42 @@ def test_rabbitmq_predefined_configuration(rabbitmq_cluster): break +def test_rabbitmq_address(rabbitmq_cluster): + + instance2.query(""" + drop table if exists rabbit_in; + drop table if exists rabbit_out; + create table + rabbit_in (val String) + engine=RabbitMQ + SETTINGS rabbitmq_exchange_name = 'rxhep', + rabbitmq_format = 'CSV', + rabbitmq_num_consumers = 1, + rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/'; + create table + rabbit_out (val String) engine=RabbitMQ + SETTINGS rabbitmq_exchange_name = 'rxhep', + rabbitmq_format = 'CSV', + rabbitmq_num_consumers = 1, + rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/'; + set stream_like_engine_allow_direct_select=1; + insert into rabbit_out select 'kek'; + """) + + result = "" + try_no = 0 + while True: + result = instance2.query("select * from rabbit_in;") + if result.strip() == "kek": + break + else: + try_no = try_no + 1 + if try_no == 20: + break + time.sleep(1) + assert result.strip() == "kek" + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")