This commit is contained in:
kssenii 2022-06-24 02:07:16 +02:00
parent e7c8023ef5
commit 4c99f608b3
2 changed files with 70 additions and 7 deletions

View File

@ -93,18 +93,40 @@ StorageRabbitMQ::StorageRabbitMQ(
, milliseconds_to_wait(RESCHEDULE_MS)
, is_attach(is_attach_)
{
auto parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672);
context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second));
const auto & config = getContext()->getConfigRef();
std::pair<String, UInt16> parsed_address;
auto setting_rabbitmq_username = rabbitmq_settings->rabbitmq_username.value;
auto setting_rabbitmq_password = rabbitmq_settings->rabbitmq_password.value;
String username, password;
if (rabbitmq_settings->rabbitmq_host_port.changed)
{
username = setting_rabbitmq_username.empty() ? config.getString("rabbitmq.username", "") : setting_rabbitmq_username;
password = setting_rabbitmq_password.empty() ? config.getString("rabbitmq.password", "") : setting_rabbitmq_password;
if (username.empty() || password.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"No username or password. They can be specified either in config or in storage settings");
parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672);
if (parsed_address.first.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Host or port is incorrect (host: {}, port: {})", parsed_address.first, parsed_address.second);
context_->getRemoteHostFilter().checkHostAndPort(parsed_address.first, toString(parsed_address.second));
}
else if (!rabbitmq_settings->rabbitmq_address.changed)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ requires either `rabbitmq_host_port` or `rabbitmq_address` setting");
auto rabbitmq_username = rabbitmq_settings->rabbitmq_username.value;
auto rabbitmq_password = rabbitmq_settings->rabbitmq_password.value;
configuration =
{
.host = parsed_address.first,
.port = parsed_address.second,
.username = rabbitmq_username.empty() ? getContext()->getConfigRef().getString("rabbitmq.username") : rabbitmq_username,
.password = rabbitmq_password.empty() ? getContext()->getConfigRef().getString("rabbitmq.password") : rabbitmq_password,
.vhost = getContext()->getConfigRef().getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)),
.username = username,
.password = password,
.vhost = config.getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)),
.secure = rabbitmq_settings->rabbitmq_secure.value,
.connection_string = getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_address)
};

View File

@ -30,6 +30,11 @@ instance = cluster.add_instance(
stay_alive=True,
)
instance2 = cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_rabbitmq=True,
)
# Helpers
@ -2745,6 +2750,42 @@ def test_rabbitmq_predefined_configuration(rabbitmq_cluster):
break
def test_rabbitmq_address(rabbitmq_cluster):
instance2.query("""
drop table if exists rabbit_in;
drop table if exists rabbit_out;
create table
rabbit_in (val String)
engine=RabbitMQ
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
create table
rabbit_out (val String) engine=RabbitMQ
SETTINGS rabbitmq_exchange_name = 'rxhep',
rabbitmq_format = 'CSV',
rabbitmq_num_consumers = 1,
rabbitmq_address='amqp://root:clickhouse@rabbitmq1:5672/';
set stream_like_engine_allow_direct_select=1;
insert into rabbit_out select 'kek';
""")
result = ""
try_no = 0
while True:
result = instance2.query("select * from rabbit_in;")
if result.strip() == "kek":
break
else:
try_no = try_no + 1
if try_no == 20:
break
time.sleep(1)
assert result.strip() == "kek"
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")