This commit is contained in:
kssenii 2023-04-03 12:11:12 +02:00
parent 6d75ca9c6f
commit 35a8328922
3 changed files with 17 additions and 17 deletions

View File

@ -24,22 +24,24 @@ RabbitMQConsumer::RabbitMQConsumer(
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
uint32_t queue_size_)
: event_handler(event_handler_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, log(log_)
, stopped(stopped_)
, received(queue_size_)
{
}
void RabbitMQConsumer::closeChannel()
void RabbitMQConsumer::shutdown()
{
{
std::lock_guard lock(mutex);
stopped = true;
}
cv.notify_one();
if (consumer_channel)
consumer_channel->close();
}

View File

@ -32,8 +32,7 @@ public:
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);
uint32_t queue_size_);
struct AckTracker
{
@ -60,12 +59,12 @@ public:
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel();
void shutdown();
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }
bool isConsumerStopped() { return stopped; }
bool isConsumerStopped() const { return stopped; }
bool ackMessages();
void updateAckTracker(AckTracker record = AckTracker());
@ -95,7 +94,7 @@ private:
const String channel_base;
const size_t channel_id_base;
Poco::Logger * log;
const std::atomic<bool> & stopped;
bool stopped;
String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;

View File

@ -713,7 +713,7 @@ void StorageRabbitMQ::read(
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());
for (size_t i = 0; i < num_created_consumers; ++i)
{
@ -818,6 +818,9 @@ void StorageRabbitMQ::shutdown()
{
shutdown_called = true;
for (auto & consumer : consumers)
consumer->shutdown();
LOG_TRACE(log, "Deactivating background tasks");
/// In case it has not yet been able to setup connection;
@ -833,9 +836,6 @@ void StorageRabbitMQ::shutdown()
/// Just a paranoid try catch, it is not actually needed.
try
{
for (auto & consumer : consumers)
consumer->closeChannel();
if (drop_table)
cleanupRabbitMQ();
@ -943,8 +943,7 @@ RabbitMQConsumerPtr StorageRabbitMQ::popConsumer(std::chrono::milliseconds timeo
RabbitMQConsumerPtr StorageRabbitMQ::createConsumer()
{
return std::make_shared<RabbitMQConsumer>(
connection->getHandler(), queues, ++consumer_id,
unique_strbase, log, queue_size, shutdown_called);
connection->getHandler(), queues, ++consumer_id, unique_strbase, log, queue_size);
}
bool StorageRabbitMQ::hasDependencies(const StorageID & table_id)
@ -1081,7 +1080,7 @@ bool StorageRabbitMQ::tryStreamToViews()
uint64_t max_execution_time_ms = rabbitmq_settings->rabbitmq_flush_interval_ms.changed
? rabbitmq_settings->rabbitmq_flush_interval_ms
: (static_cast<UInt64>(getContext()->getSettingsRef().stream_flush_interval_ms) * 1000);
: static_cast<UInt64>(Poco::Timespan(getContext()->getSettingsRef().stream_flush_interval_ms).milliseconds());
for (size_t i = 0; i < num_created_consumers; ++i)
{