Add some logging to StorageRabbitMQ

This commit is contained in:
kssenii 2023-09-20 15:54:11 +02:00
parent 2ba9263098
commit 3ef2c37b92
2 changed files with 68 additions and 49 deletions

View File

@ -959,70 +959,88 @@ bool StorageRabbitMQ::hasDependencies(const StorageID & table_id)
void StorageRabbitMQ::streamingToViewsFunc()
{
chassert(initialized);
if (initialized)
try
{
try
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
bool rabbit_connected = connection->isConnected() || connection->reconnect();
if (num_views && rabbit_connected)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!shutdown_called && num_created_consumers > 0)
{
if (!hasDependencies(table_id))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
bool continue_reading = tryStreamToViews();
if (!continue_reading)
break;
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms;
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
streamToViewsImpl();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
/// If there is no running select, stop the loop which was
/// activated by previous select.
if (connection->getHandler().loopRunning())
stopLoopIfNoReaders();
try
{
/// If there is no running select, stop the loop which was
/// activated by previous select.
if (connection->getHandler().loopRunning())
stopLoopIfNoReaders();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!shutdown_called)
if (shutdown_called)
{
LOG_DEBUG(log, "Shutdown called, stopping background streaming process");
}
else
{
/// Reschedule with backoff.
if (milliseconds_to_wait < rabbitmq_settings->rabbitmq_empty_queue_backoff_end_ms)
milliseconds_to_wait += rabbitmq_settings->rabbitmq_empty_queue_backoff_step_ms;
LOG_DEBUG(log, "Rescheduling background streaming process in {}", milliseconds_to_wait);
streaming_task->scheduleAfter(milliseconds_to_wait);
}
}
void StorageRabbitMQ::streamToViewsImpl()
{
if (!initialized)
{
chassert(false);
return;
}
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
size_t num_views = DatabaseCatalog::instance().getDependentViews(table_id).size();
bool rabbit_connected = connection->isConnected() || connection->reconnect();
if (num_views && rabbit_connected)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!shutdown_called && num_created_consumers > 0)
{
if (!hasDependencies(table_id))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
bool continue_reading = tryStreamToViews();
if (!continue_reading)
break;
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
milliseconds_to_wait = rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms;
}
}
}
bool StorageRabbitMQ::tryStreamToViews()
{

View File

@ -187,6 +187,7 @@ private:
void bindExchange(AMQP::TcpChannel & rabbit_channel);
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
void streamToViewsImpl();
/// Return true on successful stream attempt.
bool tryStreamToViews();
bool hasDependencies(const StorageID & table_id);