Fix lazy initialization in RabbitMQ, fix possible deadlock on insert into unitialized queue engine

This commit is contained in:
avogar 2024-01-29 20:09:09 +00:00
parent a51aa7b668
commit 3a5ba56c36
4 changed files with 52 additions and 52 deletions

View File

@ -12,7 +12,16 @@ void AsynchronousMessageProducer::start(const ContextPtr & context)
{ {
LOG_TEST(log, "Executing startup"); LOG_TEST(log, "Executing startup");
initialize(); try
{
initialize();
}
catch (...)
{
finished = true;
throw;
}
producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this] producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this]
{ {
LOG_TEST(log, "Starting producing task loop"); LOG_TEST(log, "Starting producing task loop");

View File

@ -69,7 +69,7 @@ StorageRabbitMQ::StorageRabbitMQ(
ContextPtr context_, ContextPtr context_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_, std::unique_ptr<RabbitMQSettings> rabbitmq_settings_,
bool is_attach_) bool is_attach)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, rabbitmq_settings(std::move(rabbitmq_settings_)) , rabbitmq_settings(std::move(rabbitmq_settings_))
@ -91,7 +91,6 @@ StorageRabbitMQ::StorageRabbitMQ(
, unique_strbase(getRandomName()) , unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize()))) , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms) , milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms)
, is_attach(is_attach_)
{ {
const auto & config = getContext()->getConfigRef(); const auto & config = getContext()->getConfigRef();
@ -318,10 +317,11 @@ void StorageRabbitMQ::connectionFunc()
try try
{ {
if (connection->reconnect()) if (connection->reconnect())
{
initRabbitMQ(); initRabbitMQ();
streaming_task->scheduleAfter(RESCHEDULE_MS);
streaming_task->scheduleAfter(RESCHEDULE_MS); return;
return; }
} }
catch (...) catch (...)
{ {
@ -373,57 +373,37 @@ void StorageRabbitMQ::initRabbitMQ()
} }
else else
{ {
try auto rabbit_channel = connection->createChannel();
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
initExchange(*rabbit_channel);
bindExchange(*rabbit_channel);
for (const auto i : collections::range(0, num_queues))
bindQueue(i + 1, *rabbit_channel);
if (queues.size() != num_queues)
{ {
auto rabbit_channel = connection->createChannel(); throw Exception(
ErrorCodes::LOGICAL_ERROR,
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers "Expected all queues to be initialized (but having {}/{})",
queues.size(), num_queues);
initExchange(*rabbit_channel);
bindExchange(*rabbit_channel);
for (const auto i : collections::range(0, num_queues))
bindQueue(i + 1, *rabbit_channel);
if (queues.size() != num_queues)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected all queues to be initialized (but having {}/{})",
queues.size(), num_queues);
}
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_channel->close();
}
catch (...)
{
tryLogCurrentException(log);
if (is_attach)
return; /// A user will have to reattach the table.
throw;
} }
LOG_TRACE(log, "RabbitMQ setup completed");
rabbit_channel->close();
} }
LOG_TRACE(log, "Registering {} conumers", num_consumers); LOG_TRACE(log, "Registering {} conumers", num_consumers);
for (size_t i = 0; i < num_consumers; ++i) for (size_t i = 0; i < num_consumers; ++i)
{ {
try auto consumer = createConsumer();
{ consumer->updateChannel(*connection);
auto consumer = createConsumer(); consumers_ref.push_back(consumer);
consumer->updateChannel(*connection); pushConsumer(consumer);
consumers_ref.push_back(consumer); ++num_created_consumers;
pushConsumer(consumer);
++num_created_consumers;
}
catch (...)
{
if (!is_attach)
throw;
tryLogCurrentException(log);
}
} }
LOG_TRACE(log, "Registered {}/{} conumers", num_created_consumers, num_consumers); LOG_TRACE(log, "Registered {}/{} conumers", num_created_consumers, num_consumers);

View File

@ -27,7 +27,7 @@ public:
ContextPtr context_, ContextPtr context_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_, std::unique_ptr<RabbitMQSettings> rabbitmq_settings_,
bool is_attach_); bool is_attach);
std::string getName() const override { return "RabbitMQ"; } std::string getName() const override { return "RabbitMQ"; }
@ -158,10 +158,9 @@ private:
size_t read_attempts = 0; size_t read_attempts = 0;
mutable bool drop_table = false; mutable bool drop_table = false;
bool is_attach;
RabbitMQConsumerPtr createConsumer(); RabbitMQConsumerPtr createConsumer();
bool initialized = false; std::atomic<bool> initialized = false;
/// Functions working in the background /// Functions working in the background
void streamingToViewsFunc(); void streamingToViewsFunc();

View File

@ -3538,3 +3538,15 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster):
expected = "".join(sorted(expected)) expected = "".join(sorted(expected))
assert broken_messages == expected assert broken_messages == expected
def test_attach_broken_table(rabbitmq_cluster):
instance.query(
"ATTACH TABLE rabbit_queue UUID '2d1cdf1a-f060-4a61-a7c9-5b59e59992c6' (`payload` String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'nonexisting:5671', rabbitmq_format = 'JSONEachRow', rabbitmq_username = 'test', rabbitmq_password = 'test'"
)
error = instance.query_and_get_error("SELECT * FROM rabbit_queue")
assert "CANNOT_CONNECT_RABBITMQ" in error
error = instance.query_and_get_error("INSERT INTO rabbit_queue VALUES ('test')")
assert "CANNOT_CONNECT_RABBITMQ" in error