From 3a5ba56c36775709d8cfa4b2d6deb081b69dd2a5 Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 29 Jan 2024 20:09:09 +0000 Subject: [PATCH] Fix lazy initialization in RabbitMQ, fix possible deadlock on insert into unitialized queue engine --- src/Storages/IMessageProducer.cpp | 11 ++- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 76 +++++++------------ src/Storages/RabbitMQ/StorageRabbitMQ.h | 5 +- .../integration/test_storage_rabbitmq/test.py | 12 +++ 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/Storages/IMessageProducer.cpp b/src/Storages/IMessageProducer.cpp index c723ec77b70..20c47f6f0b4 100644 --- a/src/Storages/IMessageProducer.cpp +++ b/src/Storages/IMessageProducer.cpp @@ -12,7 +12,16 @@ void AsynchronousMessageProducer::start(const ContextPtr & context) { LOG_TEST(log, "Executing startup"); - initialize(); + try + { + initialize(); + } + catch (...) + { + finished = true; + throw; + } + producing_task = context->getSchedulePool().createTask(getProducingTaskName(), [this] { LOG_TEST(log, "Starting producing task loop"); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 025f421db59..868f48d0b7d 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -69,7 +69,7 @@ StorageRabbitMQ::StorageRabbitMQ( ContextPtr context_, const ColumnsDescription & columns_, std::unique_ptr rabbitmq_settings_, - bool is_attach_) + bool is_attach) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , rabbitmq_settings(std::move(rabbitmq_settings_)) @@ -91,7 +91,6 @@ StorageRabbitMQ::StorageRabbitMQ( , unique_strbase(getRandomName()) , queue_size(std::max(QUEUE_SIZE, static_cast(getMaxBlockSize()))) , milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms) - , is_attach(is_attach_) { const auto & config = getContext()->getConfigRef(); @@ -318,10 +317,11 @@ void StorageRabbitMQ::connectionFunc() try { if (connection->reconnect()) + { initRabbitMQ(); - - streaming_task->scheduleAfter(RESCHEDULE_MS); - return; + streaming_task->scheduleAfter(RESCHEDULE_MS); + return; + } } catch (...) { @@ -373,57 +373,37 @@ void StorageRabbitMQ::initRabbitMQ() } else { - try + auto rabbit_channel = connection->createChannel(); + + /// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers + + initExchange(*rabbit_channel); + bindExchange(*rabbit_channel); + + for (const auto i : collections::range(0, num_queues)) + bindQueue(i + 1, *rabbit_channel); + + if (queues.size() != num_queues) { - auto rabbit_channel = connection->createChannel(); - - /// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers - - initExchange(*rabbit_channel); - bindExchange(*rabbit_channel); - - for (const auto i : collections::range(0, num_queues)) - bindQueue(i + 1, *rabbit_channel); - - if (queues.size() != num_queues) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Expected all queues to be initialized (but having {}/{})", - queues.size(), num_queues); - } - - LOG_TRACE(log, "RabbitMQ setup completed"); - rabbit_channel->close(); - } - catch (...) - { - tryLogCurrentException(log); - if (is_attach) - return; /// A user will have to reattach the table. - throw; + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected all queues to be initialized (but having {}/{})", + queues.size(), num_queues); } + + LOG_TRACE(log, "RabbitMQ setup completed"); + rabbit_channel->close(); } LOG_TRACE(log, "Registering {} conumers", num_consumers); for (size_t i = 0; i < num_consumers; ++i) { - try - { - auto consumer = createConsumer(); - consumer->updateChannel(*connection); - consumers_ref.push_back(consumer); - pushConsumer(consumer); - ++num_created_consumers; - } - catch (...) - { - if (!is_attach) - throw; - - tryLogCurrentException(log); - } + auto consumer = createConsumer(); + consumer->updateChannel(*connection); + consumers_ref.push_back(consumer); + pushConsumer(consumer); + ++num_created_consumers; } LOG_TRACE(log, "Registered {}/{} conumers", num_created_consumers, num_consumers); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index be46caf9798..696734617be 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -27,7 +27,7 @@ public: ContextPtr context_, const ColumnsDescription & columns_, std::unique_ptr rabbitmq_settings_, - bool is_attach_); + bool is_attach); std::string getName() const override { return "RabbitMQ"; } @@ -158,10 +158,9 @@ private: size_t read_attempts = 0; mutable bool drop_table = false; - bool is_attach; RabbitMQConsumerPtr createConsumer(); - bool initialized = false; + std::atomic initialized = false; /// Functions working in the background void streamingToViewsFunc(); diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 6924f2e1508..28dbca1862c 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -3538,3 +3538,15 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): expected = "".join(sorted(expected)) assert broken_messages == expected + + +def test_attach_broken_table(rabbitmq_cluster): + instance.query( + "ATTACH TABLE rabbit_queue UUID '2d1cdf1a-f060-4a61-a7c9-5b59e59992c6' (`payload` String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'nonexisting:5671', rabbitmq_format = 'JSONEachRow', rabbitmq_username = 'test', rabbitmq_password = 'test'" + ) + + error = instance.query_and_get_error("SELECT * FROM rabbit_queue") + assert "CANNOT_CONNECT_RABBITMQ" in error + + error = instance.query_and_get_error("INSERT INTO rabbit_queue VALUES ('test')") + assert "CANNOT_CONNECT_RABBITMQ" in error