2020-05-20 09:40:49 +00:00
|
|
|
#include <utility>
|
2020-05-26 17:34:57 +00:00
|
|
|
#include <chrono>
|
|
|
|
#include <thread>
|
2020-05-31 08:39:22 +00:00
|
|
|
#include <mutex>
|
|
|
|
#include <atomic>
|
|
|
|
#include <memory>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
|
|
|
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
2020-06-13 21:37:37 +00:00
|
|
|
#include <boost/algorithm/string/split.hpp>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <common/logger_useful.h>
|
2020-06-07 11:14:05 +00:00
|
|
|
#include "Poco/Timer.h"
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <amqpcpp.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
2020-05-29 16:04:44 +00:00
|
|
|
ChannelPtr consumer_channel_,
|
2020-06-30 01:48:11 +00:00
|
|
|
HandlerPtr event_handler_,
|
2020-10-27 07:14:38 +00:00
|
|
|
std::vector<String> & queues_,
|
2020-08-15 06:50:53 +00:00
|
|
|
size_t channel_id_base_,
|
|
|
|
const String & channel_base_,
|
2020-05-20 09:40:49 +00:00
|
|
|
Poco::Logger * log_,
|
|
|
|
char row_delimiter_,
|
2020-09-01 14:11:34 +00:00
|
|
|
uint32_t queue_size_,
|
2020-05-20 09:40:49 +00:00
|
|
|
const std::atomic<bool> & stopped_)
|
|
|
|
: ReadBuffer(nullptr, 0)
|
2020-05-29 16:04:44 +00:00
|
|
|
, consumer_channel(std::move(consumer_channel_))
|
2020-06-30 01:48:11 +00:00
|
|
|
, event_handler(event_handler_)
|
2020-10-27 07:14:38 +00:00
|
|
|
, queues(queues_)
|
2020-08-15 06:50:53 +00:00
|
|
|
, channel_base(channel_base_)
|
|
|
|
, channel_id_base(channel_id_base_)
|
2020-07-13 01:11:35 +00:00
|
|
|
, log(log_)
|
|
|
|
, row_delimiter(row_delimiter_)
|
2020-05-20 09:40:49 +00:00
|
|
|
, stopped(stopped_)
|
2020-10-27 20:28:52 +00:00
|
|
|
, received(queue_size_)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-12-02 01:17:50 +00:00
|
|
|
if (consumer_channel)
|
|
|
|
setupChannel();
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
|
|
|
|
{
|
|
|
|
BufferBase::set(nullptr, 0, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-07-29 19:45:18 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::subscribe()
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-29 19:45:18 +00:00
|
|
|
for (const auto & queue_name : queues)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-29 19:45:18 +00:00
|
|
|
consumer_channel->consume(queue_name)
|
2020-08-15 06:50:53 +00:00
|
|
|
.onSuccess([&](const std::string & /* consumer_tag */)
|
2020-07-29 19:45:18 +00:00
|
|
|
{
|
2020-08-15 06:50:53 +00:00
|
|
|
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
if (++subscribed == queues.size())
|
|
|
|
wait_subscription.store(false);
|
2020-07-29 19:45:18 +00:00
|
|
|
})
|
|
|
|
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
|
2020-07-24 12:33:07 +00:00
|
|
|
{
|
2020-07-29 19:45:18 +00:00
|
|
|
if (message.bodySize())
|
|
|
|
{
|
|
|
|
String message_received = std::string(message.body(), message.body() + message.bodySize());
|
|
|
|
if (row_delimiter != '\0')
|
|
|
|
message_received += row_delimiter;
|
|
|
|
|
2020-10-27 20:28:52 +00:00
|
|
|
received.push({message_received, message.hasMessageID() ? message.messageID() : "",
|
|
|
|
message.hasTimestamp() ? message.timestamp() : 0,
|
|
|
|
redelivered, AckTracker(delivery_tag, channel_id)});
|
2020-07-29 19:45:18 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-08-28 08:52:02 +00:00
|
|
|
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
|
2020-10-27 11:04:03 +00:00
|
|
|
* arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them
|
2020-08-28 08:52:02 +00:00
|
|
|
*/
|
2020-08-15 06:50:53 +00:00
|
|
|
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
|
2020-08-28 08:52:02 +00:00
|
|
|
wait_subscription.store(false);
|
2020-07-29 19:45:18 +00:00
|
|
|
});
|
|
|
|
}
|
2020-06-09 21:52:06 +00:00
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-08 01:11:48 +00:00
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
bool ReadBufferFromRabbitMQConsumer::ackMessages()
|
2020-07-24 12:33:07 +00:00
|
|
|
{
|
2020-09-07 09:32:45 +00:00
|
|
|
AckTracker record_info = last_inserted_record_info;
|
2020-08-15 06:50:53 +00:00
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
|
|
|
|
* channel, so if channel fails, all previous delivery tags become invalid
|
|
|
|
*/
|
2020-09-07 09:32:45 +00:00
|
|
|
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
|
2020-07-24 12:33:07 +00:00
|
|
|
{
|
2020-10-27 11:04:03 +00:00
|
|
|
/// Commit all received messages with delivery tags from last committed to last inserted
|
2020-09-07 09:32:45 +00:00
|
|
|
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
|
2020-08-31 16:34:16 +00:00
|
|
|
{
|
2020-10-27 11:04:03 +00:00
|
|
|
LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}",
|
2020-09-07 09:32:45 +00:00
|
|
|
record_info.delivery_tag, channel_id);
|
2020-08-28 08:52:02 +00:00
|
|
|
return false;
|
2020-08-31 16:34:16 +00:00
|
|
|
}
|
2020-08-15 06:50:53 +00:00
|
|
|
|
2020-09-07 09:32:45 +00:00
|
|
|
prev_tag = record_info.delivery_tag;
|
2020-10-27 11:04:03 +00:00
|
|
|
LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
|
2020-07-24 12:33:07 +00:00
|
|
|
}
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
return true;
|
2020-07-24 12:33:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-09-07 09:32:45 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record_info)
|
2020-08-15 06:50:53 +00:00
|
|
|
{
|
2020-09-07 09:32:45 +00:00
|
|
|
if (record_info.delivery_tag && channel_error.load())
|
2020-08-15 06:50:53 +00:00
|
|
|
return;
|
|
|
|
|
2020-09-07 09:32:45 +00:00
|
|
|
if (!record_info.delivery_tag)
|
2020-08-15 06:50:53 +00:00
|
|
|
prev_tag = 0;
|
|
|
|
|
2020-09-07 09:32:45 +00:00
|
|
|
last_inserted_record_info = record_info;
|
2020-08-15 06:50:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::setupChannel()
|
2020-08-15 06:50:53 +00:00
|
|
|
{
|
2020-08-28 08:52:02 +00:00
|
|
|
wait_subscription.store(true);
|
|
|
|
|
2020-08-15 06:50:53 +00:00
|
|
|
consumer_channel->onReady([&]()
|
|
|
|
{
|
2020-08-15 14:38:29 +00:00
|
|
|
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
|
|
|
|
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
|
2020-08-28 08:52:02 +00:00
|
|
|
* channel_id is unique for each table
|
2020-08-15 14:38:29 +00:00
|
|
|
*/
|
|
|
|
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
|
2020-08-15 06:50:53 +00:00
|
|
|
LOG_TRACE(log, "Channel {} is created", channel_id);
|
|
|
|
|
2020-08-28 08:52:02 +00:00
|
|
|
subscribed = 0;
|
2020-08-15 06:50:53 +00:00
|
|
|
subscribe();
|
|
|
|
channel_error.store(false);
|
|
|
|
});
|
2020-08-28 08:52:02 +00:00
|
|
|
|
|
|
|
consumer_channel->onError([&](const char * message)
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
|
|
|
|
|
|
|
|
channel_error.store(true);
|
|
|
|
wait_subscription.store(false);
|
|
|
|
});
|
2020-08-15 06:50:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-03 14:11:35 +00:00
|
|
|
bool ReadBufferFromRabbitMQConsumer::needChannelUpdate()
|
|
|
|
{
|
|
|
|
if (wait_subscription)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
return channel_error || !consumer_channel || !consumer_channel->usable();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-07-02 16:44:04 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
|
2020-06-04 06:22:53 +00:00
|
|
|
{
|
2020-07-02 16:44:04 +00:00
|
|
|
event_handler->iterateLoop();
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool ReadBufferFromRabbitMQConsumer::nextImpl()
|
|
|
|
{
|
|
|
|
if (stopped || !allowed)
|
|
|
|
return false;
|
|
|
|
|
2020-07-20 10:05:00 +00:00
|
|
|
if (received.tryPop(current))
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-20 10:05:00 +00:00
|
|
|
auto * new_position = const_cast<char *>(current.message.data());
|
|
|
|
BufferBase::set(new_position, current.message.size(), 0);
|
2020-06-29 06:33:53 +00:00
|
|
|
allowed = false;
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-29 06:33:53 +00:00
|
|
|
return true;
|
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-29 06:33:53 +00:00
|
|
|
return false;
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|