mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #63515 from ClickHouse/fix-rabbitmq-heap-use-after-free
Fix rabbitmq heap-use-after-free with clang18
This commit is contained in:
commit
39b4411028
@ -408,9 +408,7 @@ void StorageRabbitMQ::initRabbitMQ()
|
||||
|
||||
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
|
||||
|
||||
initExchange(*rabbit_channel);
|
||||
bindExchange(*rabbit_channel);
|
||||
|
||||
for (const auto i : collections::range(0, num_queues))
|
||||
bindQueue(i + 1, *rabbit_channel);
|
||||
|
||||
@ -442,7 +440,7 @@ void StorageRabbitMQ::initRabbitMQ()
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::initExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
{
|
||||
/// Exchange hierarchy:
|
||||
/// 1. Main exchange (defined with table settings - rabbitmq_exchange_name, rabbitmq_exchange_type).
|
||||
@ -455,68 +453,78 @@ void StorageRabbitMQ::initExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
/// 1. `durable` (survive RabbitMQ server restart)
|
||||
/// 2. `autodelete` (auto delete in case of queue bindings are dropped).
|
||||
|
||||
std::string error;
|
||||
int error_code;
|
||||
rabbit_channel.declareExchange(exchange_name, exchange_type, AMQP::durable)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
connection->getHandler().stopLoop();
|
||||
/// This error can be a result of attempt to declare exchange if it was already declared but
|
||||
/// 1) with different exchange type.
|
||||
/// 2) with different exchange settings.
|
||||
throw Exception(ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
|
||||
"Unable to declare exchange. Make sure specified exchange is not already declared. Error: {}",
|
||||
std::string(message));
|
||||
error = "Unable to declare exchange. "
|
||||
"Make sure specified exchange is not already declared. Error: " + std::string(message);
|
||||
error_code = ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
||||
});
|
||||
|
||||
rabbit_channel.declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable | AMQP::autodelete)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
connection->getHandler().stopLoop();
|
||||
/// This error is not supposed to happen as this exchange name is always unique to type and its settings.
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
|
||||
"Unable to declare bridge exchange ({}). Reason: {}", bridge_exchange, std::string(message));
|
||||
if (error.empty())
|
||||
{
|
||||
error = fmt::format("Unable to declare bridge exchange ({}). Reason: {}",
|
||||
bridge_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
||||
}
|
||||
});
|
||||
|
||||
if (!hash_exchange)
|
||||
if (hash_exchange)
|
||||
{
|
||||
AMQP::Table binding_arguments;
|
||||
|
||||
/// Default routing key property in case of hash exchange is a routing key, which is required to be an integer.
|
||||
/// Support for arbitrary exchange type (i.e. arbitrary pattern of routing keys) requires to eliminate this dependency.
|
||||
/// This settings changes hash property to message_id.
|
||||
binding_arguments["hash-property"] = "message_id";
|
||||
|
||||
/// Declare hash exchange for sharding.
|
||||
rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
connection->getHandler().stopLoop();
|
||||
/// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
|
||||
/// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
|
||||
/// is bad.
|
||||
if (error.empty())
|
||||
{
|
||||
error = fmt::format("Unable to declare sharding exchange ({}). Reason: {}",
|
||||
sharding_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
||||
}
|
||||
});
|
||||
|
||||
rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
connection->getHandler().stopLoop();
|
||||
if (error.empty())
|
||||
{
|
||||
error = fmt::format(
|
||||
"Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
|
||||
bridge_exchange, sharding_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE;
|
||||
}
|
||||
});
|
||||
|
||||
consumer_exchange = sharding_exchange;
|
||||
}
|
||||
else
|
||||
{
|
||||
consumer_exchange = bridge_exchange;
|
||||
return;
|
||||
}
|
||||
|
||||
AMQP::Table binding_arguments;
|
||||
|
||||
/// Default routing key property in case of hash exchange is a routing key, which is required to be an integer.
|
||||
/// Support for arbitrary exchange type (i.e. arbitrary pattern of routing keys) requires to eliminate this dependency.
|
||||
/// This settings changes hash property to message_id.
|
||||
binding_arguments["hash-property"] = "message_id";
|
||||
|
||||
/// Declare hash exchange for sharding.
|
||||
rabbit_channel.declareExchange(sharding_exchange, AMQP::consistent_hash, AMQP::durable | AMQP::autodelete, binding_arguments)
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
/// This error can be a result of same reasons as above for exchange_name, i.e. it will mean that sharding exchange name appeared
|
||||
/// to be the same as some other exchange (which purpose is not for sharding). So probably actual error reason: queue_base parameter
|
||||
/// is bad.
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_DECLARE_RABBITMQ_EXCHANGE,
|
||||
"Unable to declare sharding exchange ({}). Reason: {}", sharding_exchange, std::string(message));
|
||||
});
|
||||
|
||||
rabbit_channel.bindExchange(bridge_exchange, sharding_exchange, routing_keys[0])
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
||||
"Unable to bind bridge exchange ({}) to sharding exchange ({}). Reason: {}",
|
||||
bridge_exchange,
|
||||
sharding_exchange,
|
||||
std::string(message));
|
||||
});
|
||||
|
||||
consumer_exchange = sharding_exchange;
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
{
|
||||
size_t bound_keys = 0;
|
||||
|
||||
if (exchange_type == AMQP::ExchangeType::headers)
|
||||
@ -533,10 +541,10 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
.onSuccess([&]() { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
||||
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
connection->getHandler().stopLoop();
|
||||
error = fmt::format("Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE;
|
||||
});
|
||||
}
|
||||
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
|
||||
@ -545,10 +553,13 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
.onSuccess([&]() { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
||||
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
connection->getHandler().stopLoop();
|
||||
if (error.empty())
|
||||
{
|
||||
error = fmt::format("Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE;
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
@ -564,20 +575,26 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE,
|
||||
"Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
connection->getHandler().stopLoop();
|
||||
if (error.empty())
|
||||
{
|
||||
error = fmt::format("Unable to bind exchange {} to bridge exchange ({}). Reason: {}",
|
||||
exchange_name, bridge_exchange, std::string(message));
|
||||
error_code = ErrorCodes::CANNOT_BIND_RABBITMQ_EXCHANGE;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
connection->getHandler().startBlockingLoop();
|
||||
if (!error.empty())
|
||||
throw Exception(error_code, "{}", error);
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel)
|
||||
{
|
||||
std::string error;
|
||||
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
|
||||
{
|
||||
queues.emplace_back(queue_name);
|
||||
@ -594,23 +611,26 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann
|
||||
.onSuccess([&] { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
|
||||
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
|
||||
connection->getHandler().stopLoop();
|
||||
error = fmt::format("Failed to create queue binding for exchange {}. Reason: {}",
|
||||
exchange_name, std::string(message));
|
||||
});
|
||||
};
|
||||
|
||||
auto error_callback([&](const char * message)
|
||||
{
|
||||
connection->getHandler().stopLoop();
|
||||
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
|
||||
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
|
||||
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
|
||||
* declared queues via any of the various cli tools.
|
||||
*/
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to declare queue. Probably queue settings are conflicting: "
|
||||
"max_block_size, deadletter_exchange. Attempt specifying differently those settings "
|
||||
"or use a different queue_base or manually delete previously declared queues, "
|
||||
"which were declared with the same names. ERROR reason: {}", std::string(message));
|
||||
if (error.empty())
|
||||
error = fmt::format(
|
||||
"Failed to declare queue. Probably queue settings are conflicting: "
|
||||
"max_block_size, deadletter_exchange. Attempt specifying differently those settings "
|
||||
"or use a different queue_base or manually delete previously declared queues, "
|
||||
"which were declared with the same names. ERROR reason: {}", std::string(message));
|
||||
});
|
||||
|
||||
AMQP::Table queue_settings;
|
||||
@ -648,6 +668,8 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann
|
||||
/// and deleting queues should not take place.
|
||||
rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
||||
connection->getHandler().startBlockingLoop();
|
||||
if (!error.empty())
|
||||
throw Exception(ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING, "{}", error);
|
||||
}
|
||||
|
||||
|
||||
@ -671,6 +693,7 @@ void StorageRabbitMQ::unbindExchange()
|
||||
|
||||
stopLoop();
|
||||
looping_task->deactivate();
|
||||
std::string error;
|
||||
|
||||
auto rabbit_channel = connection->createChannel();
|
||||
rabbit_channel->removeExchange(bridge_exchange)
|
||||
@ -680,11 +703,14 @@ void StorageRabbitMQ::unbindExchange()
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE, "Unable to remove exchange. Reason: {}", std::string(message));
|
||||
connection->getHandler().stopLoop();
|
||||
error = fmt::format("Unable to remove exchange. Reason: {}", std::string(message));
|
||||
});
|
||||
|
||||
connection->getHandler().startBlockingLoop();
|
||||
rabbit_channel->close();
|
||||
if (!error.empty())
|
||||
throw Exception(ErrorCodes::CANNOT_REMOVE_RABBITMQ_EXCHANGE, "{}", error);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -183,7 +183,6 @@ private:
|
||||
void initRabbitMQ();
|
||||
void cleanupRabbitMQ() const;
|
||||
|
||||
void initExchange(AMQP::TcpChannel & rabbit_channel);
|
||||
void bindExchange(AMQP::TcpChannel & rabbit_channel);
|
||||
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
|
||||
|
||||
|
@ -2606,7 +2606,7 @@ def test_rabbitmq_bad_args(rabbitmq_cluster):
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange="f", exchange_type="fanout")
|
||||
instance.query_and_get_error(
|
||||
assert "Unable to declare exchange" in instance.query_and_get_error(
|
||||
"""
|
||||
CREATE TABLE test.drop (key UInt64, value UInt64)
|
||||
ENGINE = RabbitMQ
|
||||
|
Loading…
Reference in New Issue
Block a user