From f8b45a00ec89037f2ae71d63231f8b06195596bb Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 27 Mar 2023 19:20:48 +0200 Subject: [PATCH] Fix --- src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 37 ++++++++++--------- .../integration/test_storage_rabbitmq/test.py | 20 ++++++---- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index b7fb2c6df64..da9bd8a482f 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -954,8 +954,10 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) { // Check if all dependencies are attached auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id); + LOG_TEST(log, "Number of attached views {} for {}", view_ids.size(), table_id.getNameForLogs()); + if (view_ids.empty()) - return true; + return false; // Check the dependencies are ready? for (const auto & view_id : view_ids) @@ -968,10 +970,6 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id) auto * materialized_view = dynamic_cast(view.get()); if (materialized_view && !materialized_view->tryGetTargetTable()) return false; - - // Check all its dependencies - if (!checkDependencies(view_id)) - return false; } return true; @@ -1017,27 +1015,19 @@ void StorageRabbitMQ::streamingToViewsFunc() LOG_DEBUG(log, "Started streaming to {} attached views", num_views); - if (streamToViews()) - { - /// Reschedule with backoff. - if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms) - milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms; - stopLoopIfNoReaders(); + bool continue_reading = !streamToViews(); + if (!continue_reading) break; - } - else - { - milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms; - } auto end_time = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end_time - start_time); if (duration.count() > MAX_THREAD_WORK_DURATION_MS) { - stopLoopIfNoReaders(); LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded."); break; } + + milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms; } } } @@ -1055,7 +1045,13 @@ void StorageRabbitMQ::streamingToViewsFunc() stopLoopIfNoReaders(); if (!shutdown_called) + { + /// Reschedule with backoff. + if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms) + milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms; + streaming_task->scheduleAfter(milliseconds_to_wait); + } } @@ -1117,6 +1113,13 @@ bool StorageRabbitMQ::streamToViews() deactivateTask(looping_task, false, true); size_t queue_empty = 0; + if (!checkDependencies(getStorageID())) + { + /// Do not commit to rabbitmq if the dependency was removed. + LOG_TRACE(log, "No dependencies, reschedule"); + return true; + } + if (!connection->isConnected()) { if (shutdown_called) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 10e2aaf8355..aec98e1cb73 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -2674,7 +2674,7 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster): def test_rabbitmq_drop_mv(rabbitmq_cluster): instance.query( """ - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + CREATE TABLE test.drop_mv (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mv', @@ -2693,7 +2693,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): instance.query( """ CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq; + SELECT * FROM test.drop_mv; """ ) @@ -2711,9 +2711,10 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): ) start = time.time() - while time.time() - start < 30: + while True: res = instance.query("SELECT COUNT(*) FROM test.view") - if "20" == res: + print(f"Current count (1): {res}") + if int(res) == 20: break else: logging.debug(f"Number of rows in test.view: {res}") @@ -2727,7 +2728,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): instance.query( """ CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq; + SELECT * FROM test.drop_mv; """ ) for i in range(40, 50): @@ -2736,11 +2737,13 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): ) while True: - result = instance.query("SELECT * FROM test.view ORDER BY key") - if rabbitmq_check_result(result): + result = instance.query("SELECT count() FROM test.view") + print(f"Current count (2): {result}") + if int(result) == 50: break time.sleep(1) + result = instance.query("SELECT * FROM test.view ORDER BY key") rabbitmq_check_result(result, True) instance.query("DROP VIEW test.consumer NO DELAY") @@ -2754,10 +2757,11 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): count = 0 start = time.time() while time.time() - start < 30: - count = int(instance.query("SELECT count() FROM test.rabbitmq")) + count = int(instance.query("SELECT count() FROM test.drop_mv")) if count: break + instance.query("DROP TABLE test.drop_mv") assert count > 0