Fix races

This commit is contained in:
kssenii 2020-12-05 21:55:00 +00:00
parent 448463ebe0
commit 8f1ed36897
2 changed files with 17 additions and 11 deletions

View File

@ -24,6 +24,7 @@ public:
String getName() const override { return storage.getName(); }
Block getHeader() const override;
ConsumerBufferPtr getBuffer() { return buffer; }
void readPrefixImpl() override;
Block readImpl() override;

View File

@ -635,6 +635,7 @@ void StorageRabbitMQ::shutdown()
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
deactivateTask(connection_task, true, false);
connection->close();
@ -681,17 +682,6 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo
auto buffer = buffers.back();
buffers.pop_back();
if (buffer->needChannelUpdate())
{
if (buffer->queuesCount() != queues.size())
buffer->updateQueues(queues);
buffer->updateAckTracker();
if (updateChannel(buffer->getChannel()))
buffer->setupChannel();
}
return buffer;
}
@ -884,6 +874,21 @@ bool StorageRabbitMQ::streamToViews()
if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
++queue_empty;
if (stream->as<RabbitMQBlockInputStream>()->needChannelUpdate())
{
auto buffer = stream->as<RabbitMQBlockInputStream>()->getBuffer();
if (buffer)
{
if (buffer->queuesCount() != queues.size())
buffer->updateQueues(queues);
buffer->updateAckTracker();
if (updateChannel(buffer->getChannel()))
buffer->setupChannel();
}
}
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is