This commit is contained in:
kssenii 2023-04-19 16:29:38 +02:00
parent 59459cb287
commit 323caae3c0
2 changed files with 9 additions and 19 deletions

View File

@ -187,8 +187,8 @@ StorageRabbitMQ::StorageRabbitMQ(
streaming_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
connection_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
connection_task->deactivate();
init_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
init_task->deactivate();
}
@ -323,7 +323,7 @@ void StorageRabbitMQ::connectionFunc()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
connection_task->scheduleAfter(RESCHEDULE_MS);
init_task->scheduleAfter(RESCHEDULE_MS);
}
@ -784,25 +784,15 @@ SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr
void StorageRabbitMQ::startup()
{
if (connection->isConnected())
if (initialized)
{
try
{
initRabbitMQ();
}
catch (...)
{
if (!is_attach)
throw;
tryLogCurrentException(log);
}
streaming_task->activateAndSchedule();
}
else
{
connection_task->activateAndSchedule();
streaming_task->activate();
init_task->activateAndSchedule();
}
streaming_task->activate();
}
@ -816,7 +806,7 @@ void StorageRabbitMQ::shutdown()
LOG_TRACE(log, "Deactivating background tasks");
/// In case it has not yet been able to setup connection;
deactivateTask(connection_task, true, false);
deactivateTask(init_task, true, false);
/// The order of deactivating tasks is important: wait for streamingToViews() func to finish and
/// then wait for background event loop to finish.

View File

@ -127,7 +127,7 @@ private:
std::mutex task_mutex;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder looping_task;
BackgroundSchedulePool::TaskHolder connection_task;
BackgroundSchedulePool::TaskHolder init_task;
uint64_t milliseconds_to_wait;