ClickHouse/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp

251 lines
8.3 KiB
C++
Raw Normal View History

#include <utility>
2020-05-26 17:34:57 +00:00
#include <chrono>
#include <thread>
2020-05-31 08:39:22 +00:00
#include <mutex>
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
2020-06-13 21:37:37 +00:00
#include <boost/algorithm/string/split.hpp>
#include <common/logger_useful.h>
2020-06-07 11:14:05 +00:00
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static const auto QUEUE_SIZE = 50000;
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
2020-06-30 01:48:11 +00:00
HandlerPtr event_handler_,
const String & exchange_name_,
2020-08-15 06:50:53 +00:00
size_t channel_id_base_,
const String & channel_base_,
2020-07-23 11:45:01 +00:00
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
2020-07-02 16:44:04 +00:00
size_t num_queues_,
2020-07-24 12:33:07 +00:00
const String & deadletter_exchange_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
2020-06-30 01:48:11 +00:00
, event_handler(event_handler_)
, exchange_name(exchange_name_)
2020-08-15 06:50:53 +00:00
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
2020-07-23 11:45:01 +00:00
, queue_base(queue_base_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
2020-08-15 06:50:53 +00:00
, deadletter_exchange(deadletter_exchange_)
2020-07-13 01:11:35 +00:00
, log(log_)
, row_delimiter(row_delimiter_)
, stopped(stopped_)
2020-07-20 10:05:00 +00:00
, received(QUEUE_SIZE * num_queues)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
2020-07-29 19:45:18 +00:00
bindQueue(queue_id);
consumer_channel->onReady([&]()
{
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
2020-08-15 06:50:53 +00:00
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
{
2020-08-15 06:50:53 +00:00
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
});
2020-08-15 06:50:53 +00:00
updateAckTracker(AckTracker());
subscribe();
2020-08-15 06:50:53 +00:00
channel_error.store(false);
});
}
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
BufferBase::set(nullptr, 0, 0);
}
2020-07-29 19:45:18 +00:00
void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
{
2020-08-15 06:50:53 +00:00
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
2020-07-23 11:45:01 +00:00
if (msgcount)
LOG_TRACE(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
2020-07-23 11:45:01 +00:00
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary.
*/
2020-08-15 06:50:53 +00:00
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
2020-07-29 19:45:18 +00:00
.onSuccess([&]
{
2020-08-15 06:50:53 +00:00
binding_created = true;
2020-07-29 19:45:18 +00:00
})
.onError([&](const char * message)
{
2020-08-15 06:50:53 +00:00
throw Exception("Failed to create queue binding. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
2020-07-29 19:45:18 +00:00
});
2020-07-23 11:45:01 +00:00
};
auto error_callback([&](const char * message)
{
2020-08-15 06:50:53 +00:00
throw Exception("Failed to declare queue. Reason: " + std::string(message), ErrorCodes::LOGICAL_ERROR);
});
2020-07-24 12:33:07 +00:00
AMQP::Table queue_settings;
if (!deadletter_exchange.empty())
{
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
}
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting.
*/
2020-08-26 08:54:29 +00:00
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
2020-07-23 11:45:01 +00:00
2020-08-15 06:50:53 +00:00
while (!binding_created)
{
2020-07-02 16:44:04 +00:00
iterateEventLoop();
2020-06-08 01:11:48 +00:00
}
}
2020-07-29 19:45:18 +00:00
void ReadBufferFromRabbitMQConsumer::subscribe()
{
2020-07-29 19:45:18 +00:00
for (const auto & queue_name : queues)
{
2020-07-29 19:45:18 +00:00
consumer_channel->consume(queue_name)
2020-08-15 06:50:53 +00:00
.onSuccess([&](const std::string & /* consumer_tag */)
2020-07-29 19:45:18 +00:00
{
2020-08-15 06:50:53 +00:00
LOG_TRACE(log, "Consumer on channel {} is subscribed to queue {}", channel_id, queue_name);
2020-07-29 19:45:18 +00:00
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
2020-07-24 12:33:07 +00:00
{
2020-07-29 19:45:18 +00:00
if (message.bodySize())
{
String message_received = std::string(message.body(), message.body() + message.bodySize());
if (row_delimiter != '\0')
message_received += row_delimiter;
if (message.hasMessageID())
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
else
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
2020-07-29 19:45:18 +00:00
}
})
.onError([&](const char * message)
{
2020-08-15 06:50:53 +00:00
LOG_ERROR(log, "Consumer failed on channel {}. Reason: {}", channel_id, message);
2020-07-29 19:45:18 +00:00
});
}
2020-06-09 21:52:06 +00:00
}
2020-06-08 01:11:48 +00:00
void ReadBufferFromRabbitMQConsumer::ackMessages()
2020-07-24 12:33:07 +00:00
{
2020-08-15 06:50:53 +00:00
/* Delivery tags are scoped per channel, so if channel fails, then all previous delivery tags become invalid. Also this check ensures
* that there is no data race with onReady callback in restoreChannel() (they can be called at the same time from different threads).
* And there is no need to synchronize this method with updateAckTracker() as they are not supposed to be called at the same time.
*/
if (channel_error.load())
return;
AckTracker record = last_inserted_record;
/// Do not send ack to server if message's channel is not the same as current running channel.
2020-08-26 08:54:29 +00:00
if (record.channel_id == channel_id && record.delivery_tag && record.delivery_tag > prev_tag && event_handler->connectionRunning())
2020-07-24 12:33:07 +00:00
{
2020-08-15 06:50:53 +00:00
consumer_channel->ack(record.delivery_tag, AMQP::multiple); /// Will ack all up to last tag starting from last acked.
prev_tag = record.delivery_tag;
LOG_TRACE(log, "Consumer acknowledged messages with deliveryTags up to {} on the channel {}", record.delivery_tag, channel_id);
2020-07-24 12:33:07 +00:00
}
}
2020-08-15 06:50:53 +00:00
void ReadBufferFromRabbitMQConsumer::updateAckTracker(AckTracker record)
{
/* This method can be called from readImpl and from channel->onError() callback, but channel_error check ensures that it is not done
* at the same time, so no synchronization needed.
*/
if (record.delivery_tag && channel_error.load())
return;
if (!record.delivery_tag)
prev_tag = 0;
last_inserted_record = record;
}
void ReadBufferFromRabbitMQConsumer::restoreChannel(ChannelPtr new_channel)
{
consumer_channel = std::move(new_channel);
consumer_channel->onReady([&]()
{
/* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer,
* i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that
* channel_id is unique for each table.
*/
channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base;
2020-08-15 06:50:53 +00:00
LOG_TRACE(log, "Channel {} is created", channel_id);
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} error: {}", channel_id, message);
channel_error.store(true);
});
updateAckTracker(AckTracker());
subscribe();
channel_error.store(false);
});
}
2020-07-02 16:44:04 +00:00
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
2020-06-04 06:22:53 +00:00
{
2020-07-02 16:44:04 +00:00
event_handler->iterateLoop();
}
bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
if (stopped || !allowed)
return false;
2020-07-20 10:05:00 +00:00
if (received.tryPop(current))
{
2020-07-20 10:05:00 +00:00
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
return true;
}
return false;
}
}