This commit is contained in:
kssenii 2023-03-27 19:20:48 +02:00
parent 08054b79a8
commit f8b45a00ec
2 changed files with 32 additions and 25 deletions

View File

@ -954,8 +954,10 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
LOG_TEST(log, "Number of attached views {} for {}", view_ids.size(), table_id.getNameForLogs());
if (view_ids.empty())
return true;
return false;
// Check the dependencies are ready?
for (const auto & view_id : view_ids)
@ -968,10 +970,6 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
// Check all its dependencies
if (!checkDependencies(view_id))
return false;
}
return true;
@ -1017,27 +1015,19 @@ void StorageRabbitMQ::streamingToViewsFunc()
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
if (streamToViews())
{
/// Reschedule with backoff.
if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms)
milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms;
stopLoopIfNoReaders();
bool continue_reading = !streamToViews();
if (!continue_reading)
break;
}
else
{
milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms;
}
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
stopLoopIfNoReaders();
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms;
}
}
}
@ -1055,7 +1045,13 @@ void StorageRabbitMQ::streamingToViewsFunc()
stopLoopIfNoReaders();
if (!shutdown_called)
{
/// Reschedule with backoff.
if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms)
milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms;
streaming_task->scheduleAfter(milliseconds_to_wait);
}
}
@ -1117,6 +1113,13 @@ bool StorageRabbitMQ::streamToViews()
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;
if (!checkDependencies(getStorageID()))
{
/// Do not commit to rabbitmq if the dependency was removed.
LOG_TRACE(log, "No dependencies, reschedule");
return true;
}
if (!connection->isConnected())
{
if (shutdown_called)

View File

@ -2674,7 +2674,7 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster):
def test_rabbitmq_drop_mv(rabbitmq_cluster):
instance.query(
"""
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
CREATE TABLE test.drop_mv (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
@ -2693,7 +2693,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
instance.query(
"""
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
SELECT * FROM test.drop_mv;
"""
)
@ -2711,9 +2711,10 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
)
start = time.time()
while time.time() - start < 30:
while True:
res = instance.query("SELECT COUNT(*) FROM test.view")
if "20" == res:
print(f"Current count (1): {res}")
if int(res) == 20:
break
else:
logging.debug(f"Number of rows in test.view: {res}")
@ -2727,7 +2728,7 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
instance.query(
"""
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
SELECT * FROM test.drop_mv;
"""
)
for i in range(40, 50):
@ -2736,11 +2737,13 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
)
while True:
result = instance.query("SELECT * FROM test.view ORDER BY key")
if rabbitmq_check_result(result):
result = instance.query("SELECT count() FROM test.view")
print(f"Current count (2): {result}")
if int(result) == 50:
break
time.sleep(1)
result = instance.query("SELECT * FROM test.view ORDER BY key")
rabbitmq_check_result(result, True)
instance.query("DROP VIEW test.consumer NO DELAY")
@ -2754,10 +2757,11 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
count = 0
start = time.time()
while time.time() - start < 30:
count = int(instance.query("SELECT count() FROM test.rabbitmq"))
count = int(instance.query("SELECT count() FROM test.drop_mv"))
if count:
break
instance.query("DROP TABLE test.drop_mv")
assert count > 0