ClickHouse/src/Storages/RabbitMQ/RabbitMQConsumer.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

178 lines
5.3 KiB
C++
Raw Normal View History

#include <utility>
2020-05-26 17:34:57 +00:00
#include <chrono>
#include <thread>
2020-05-31 08:39:22 +00:00
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/RabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <IO/ReadBufferFromMemory.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2020-06-07 11:14:05 +00:00
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
2021-10-08 08:48:08 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RabbitMQConsumer::RabbitMQConsumer(
2021-09-10 10:28:09 +00:00
RabbitMQHandler & event_handler_,
2020-10-27 07:14:38 +00:00
std::vector<String> & queues_,
2020-08-15 06:50:53 +00:00
size_t channel_id_base_,
const String & channel_base_,
Poco::Logger * log_,
2020-09-01 14:11:34 +00:00
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: event_handler(event_handler_)
2020-10-27 07:14:38 +00:00
, queues(queues_)
2020-08-15 06:50:53 +00:00
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
2020-07-13 01:11:35 +00:00
, log(log_)
, stopped(stopped_)
2020-10-27 20:28:52 +00:00
, received(queue_size_)
{
}
void RabbitMQConsumer::closeChannel()
2022-06-23 18:43:35 +00:00
{
2023-04-02 16:21:45 +00:00
cv.notify_one();
2022-06-23 18:43:35 +00:00
if (consumer_channel)
consumer_channel->close();
}
void RabbitMQConsumer::subscribe()
{
2020-07-29 19:45:18 +00:00
for (const auto & queue_name : queues)
{
2020-07-29 19:45:18 +00:00
consumer_channel->consume(queue_name)
2020-08-15 06:50:53 +00:00
.onSuccess([&](const std::string & /* consumer_tag */)
2020-07-29 19:45:18 +00:00
{
2020-08-15 06:50:53 +00:00
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
2020-08-28 08:52:02 +00:00
if (++subscribed == queues.size())
wait_subscription.store(false);
2020-07-29 19:45:18 +00:00
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
2020-07-24 12:33:07 +00:00
{
2020-07-29 19:45:18 +00:00
if (message.bodySize())
{
String message_received = std::string(message.body(), message.body() + message.bodySize());
2023-04-01 18:52:22 +00:00
std::unique_lock lock(mutex);
2021-10-07 22:06:33 +00:00
if (!received.push({message_received, message.hasMessageID() ? message.messageID() : "",
2020-10-27 20:28:52 +00:00
message.hasTimestamp() ? message.timestamp() : 0,
2021-10-07 22:06:33 +00:00
redelivered, AckTracker(delivery_tag, channel_id)}))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
2023-04-01 16:41:42 +00:00
cv.notify_one();
2020-07-29 19:45:18 +00:00
}
})
.onError([&](const char * message)
{
2020-08-28 08:52:02 +00:00
/* End up here either if channel ends up in an error state (then there will be resubscription) or consume call error, which
2020-10-27 11:04:03 +00:00
* arises from queue settings mismatch or queue level error, which should not happen as no one else is supposed to touch them
2020-08-28 08:52:02 +00:00
*/
2020-08-15 06:50:53 +00:00
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
2020-08-28 08:52:02 +00:00
wait_subscription.store(false);
2020-07-29 19:45:18 +00:00
});
}
2020-06-09 21:52:06 +00:00
}
2020-06-08 01:11:48 +00:00
bool RabbitMQConsumer::ackMessages()
2020-07-24 12:33:07 +00:00
{
2020-09-07 09:32:45 +00:00
AckTracker record_info = last_inserted_record_info;
2020-08-15 06:50:53 +00:00
2020-08-28 08:52:02 +00:00
/* Do not send ack to server if message's channel is not the same as current running channel because delivery tags are scoped per
* channel, so if channel fails, all previous delivery tags become invalid
*/
2020-09-07 09:32:45 +00:00
if (record_info.channel_id == channel_id && record_info.delivery_tag && record_info.delivery_tag > prev_tag)
2020-07-24 12:33:07 +00:00
{
2020-10-27 11:04:03 +00:00
/// Commit all received messages with delivery tags from last committed to last inserted
2020-09-07 09:32:45 +00:00
if (!consumer_channel->ack(record_info.delivery_tag, AMQP::multiple))
2020-08-31 16:34:16 +00:00
{
2020-10-27 11:04:03 +00:00
LOG_ERROR(log, "Failed to commit messages with delivery tags from last committed to {} on channel {}",
2020-09-07 09:32:45 +00:00
record_info.delivery_tag, channel_id);
2020-08-28 08:52:02 +00:00
return false;
2020-08-31 16:34:16 +00:00
}
2020-08-15 06:50:53 +00:00
2020-09-07 09:32:45 +00:00
prev_tag = record_info.delivery_tag;
2020-10-27 11:04:03 +00:00
LOG_TRACE(log, "Consumer committed messages with deliveryTags up to {} on channel {}", record_info.delivery_tag, channel_id);
2020-07-24 12:33:07 +00:00
}
2020-08-28 08:52:02 +00:00
return true;
2020-07-24 12:33:07 +00:00
}
void RabbitMQConsumer::updateAckTracker(AckTracker record_info)
2020-08-15 06:50:53 +00:00
{
2020-09-07 09:32:45 +00:00
if (record_info.delivery_tag && channel_error.load())
2020-08-15 06:50:53 +00:00
return;
2020-09-07 09:32:45 +00:00
if (!record_info.delivery_tag)
2020-08-15 06:50:53 +00:00
prev_tag = 0;
2020-09-07 09:32:45 +00:00
last_inserted_record_info = record_info;
2020-08-15 06:50:53 +00:00
}
void RabbitMQConsumer::setupChannel()
2020-08-15 06:50:53 +00:00
{
2021-09-16 10:46:43 +00:00
if (!consumer_channel)
return;
2020-08-28 08:52:02 +00:00
wait_subscription.store(true);
2020-08-15 06:50:53 +00:00
consumer_channel->onReady([&]()
{
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
2020-08-28 08:52:02 +00:00
* channel_id is unique for each table
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
2020-08-15 06:50:53 +00:00
LOG_TRACE(log, "Channel {} is created", channel_id);
2020-08-28 08:52:02 +00:00
subscribed = 0;
2020-08-15 06:50:53 +00:00
subscribe();
channel_error.store(false);
});
2020-08-28 08:52:02 +00:00
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
wait_subscription.store(false);
});
2020-08-15 06:50:53 +00:00
}
bool RabbitMQConsumer::needChannelUpdate()
2020-12-03 14:11:35 +00:00
{
if (wait_subscription)
return false;
return channel_error || !consumer_channel || !consumer_channel->usable();
}
void RabbitMQConsumer::iterateEventLoop()
2020-06-04 06:22:53 +00:00
{
2021-09-10 10:28:09 +00:00
event_handler.iterateLoop();
}
ReadBufferPtr RabbitMQConsumer::consume()
{
if (stopped || !received.tryPop(current))
return nullptr;
return std::make_shared<ReadBufferFromMemory>(current.message.data(), current.message.size());
}
}