Use ConcurentBoundedQueue instead of vector

This commit is contained in:
kssenii 2020-06-29 06:33:53 +00:00
parent 649eb8e348
commit 5fc0b93400
2 changed files with 12 additions and 29 deletions

View File

@ -30,6 +30,7 @@ namespace ExchangeType
static const String HEADERS = "headers";
}
static const auto QUEUE_SIZE = 50000; /// Equals capacity of single rabbitmq queue
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
@ -59,10 +60,8 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
, stopped(stopped_)
, messages(QUEUE_SIZE)
{
messages.clear();
current = messages.begin();
exchange_type_set = exchange_type != ExchangeType::DEFAULT;
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
@ -81,7 +80,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
consumer_channel->close();
messages.clear();
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
}
@ -354,17 +352,12 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
if (message_size && message.body() != nullptr)
{
String message_received = std::string(message.body(), message.body() + message_size);
if (row_delimiter != '\0')
{
message_received += row_delimiter;
}
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl().
{
std::lock_guard lock(mutex);
received.push_back(message_received);
}
messages.push(message_received);
}
})
.onError([&](const char * message)
@ -419,25 +412,16 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
if (stopped || !allowed)
return false;
if (current == messages.end())
if (messages.tryPop(current))
{
if (received.empty())
return false;
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback.
std::lock_guard lock(mutex);
messages.clear();
messages.swap(received);
current = messages.begin();
}
auto * new_position = const_cast<char *>(current->data());
BufferBase::set(new_position, current->size(), 0);
++current;
auto * new_position = const_cast<char *>(current.data());
BufferBase::set(new_position, current.size(), 0);
allowed = false;
return true;
}
return false;
}
}

View File

@ -5,7 +5,7 @@
#include <IO/ReadBuffer.h>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <event2/event.h>
namespace Poco
@ -74,10 +74,9 @@ private:
std::atomic<bool> loop_started = false, consumer_error = false;
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
ConcurrentBoundedQueue<String> messages;
String current;
std::vector<String> queues;
Messages received;
Messages messages;
Messages::iterator current;
std::unordered_map<String, bool> subscribed_queue;
/* Note: as all consumers share the same connection => they also share the same