2020-05-20 09:40:49 +00:00
|
|
|
#include <utility>
|
2020-05-26 17:34:57 +00:00
|
|
|
#include <chrono>
|
|
|
|
#include <thread>
|
2020-05-31 08:39:22 +00:00
|
|
|
#include <mutex>
|
|
|
|
#include <atomic>
|
|
|
|
#include <memory>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
|
|
|
|
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
2020-06-13 21:37:37 +00:00
|
|
|
#include <boost/algorithm/string/split.hpp>
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <common/logger_useful.h>
|
2020-06-07 11:14:05 +00:00
|
|
|
#include "Poco/Timer.h"
|
2020-05-20 09:40:49 +00:00
|
|
|
#include <amqpcpp.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2020-07-28 08:22:45 +00:00
|
|
|
static const auto QUEUE_SIZE = 50000;
|
2020-06-11 10:56:40 +00:00
|
|
|
|
2020-05-20 09:40:49 +00:00
|
|
|
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
2020-05-29 16:04:44 +00:00
|
|
|
ChannelPtr consumer_channel_,
|
2020-07-21 15:47:39 +00:00
|
|
|
ChannelPtr setup_channel_,
|
2020-06-30 01:48:11 +00:00
|
|
|
HandlerPtr event_handler_,
|
2020-05-20 09:40:49 +00:00
|
|
|
const String & exchange_name_,
|
2020-07-20 06:21:18 +00:00
|
|
|
const AMQP::ExchangeType & exchange_type_,
|
2020-06-11 09:23:23 +00:00
|
|
|
const Names & routing_keys_,
|
2020-07-02 16:44:04 +00:00
|
|
|
size_t channel_id_,
|
2020-07-23 11:45:01 +00:00
|
|
|
const String & queue_base_,
|
2020-05-20 09:40:49 +00:00
|
|
|
Poco::Logger * log_,
|
|
|
|
char row_delimiter_,
|
2020-07-20 06:21:18 +00:00
|
|
|
bool hash_exchange_,
|
2020-07-02 16:44:04 +00:00
|
|
|
size_t num_queues_,
|
2020-06-14 16:26:37 +00:00
|
|
|
const String & local_exchange_,
|
2020-07-24 12:33:07 +00:00
|
|
|
const String & deadletter_exchange_,
|
2020-05-20 09:40:49 +00:00
|
|
|
const std::atomic<bool> & stopped_)
|
|
|
|
: ReadBuffer(nullptr, 0)
|
2020-05-29 16:04:44 +00:00
|
|
|
, consumer_channel(std::move(consumer_channel_))
|
2020-07-21 15:47:39 +00:00
|
|
|
, setup_channel(setup_channel_)
|
2020-06-30 01:48:11 +00:00
|
|
|
, event_handler(event_handler_)
|
2020-05-20 09:40:49 +00:00
|
|
|
, exchange_name(exchange_name_)
|
2020-07-20 06:21:18 +00:00
|
|
|
, exchange_type(exchange_type_)
|
2020-06-11 09:23:23 +00:00
|
|
|
, routing_keys(routing_keys_)
|
2020-05-20 09:40:49 +00:00
|
|
|
, channel_id(channel_id_)
|
2020-07-23 11:45:01 +00:00
|
|
|
, queue_base(queue_base_)
|
2020-07-20 06:21:18 +00:00
|
|
|
, hash_exchange(hash_exchange_)
|
2020-05-20 09:40:49 +00:00
|
|
|
, num_queues(num_queues_)
|
2020-07-13 01:11:35 +00:00
|
|
|
, log(log_)
|
|
|
|
, row_delimiter(row_delimiter_)
|
2020-05-20 09:40:49 +00:00
|
|
|
, stopped(stopped_)
|
2020-07-21 15:47:39 +00:00
|
|
|
, local_exchange(local_exchange_)
|
2020-07-24 12:33:07 +00:00
|
|
|
, deadletter_exchange(deadletter_exchange_)
|
2020-07-20 10:05:00 +00:00
|
|
|
, received(QUEUE_SIZE * num_queues)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
|
|
|
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
|
|
|
|
initQueueBindings(queue_id);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
|
|
|
|
{
|
2020-05-29 16:04:44 +00:00
|
|
|
consumer_channel->close();
|
2020-07-20 10:05:00 +00:00
|
|
|
received.clear();
|
2020-05-20 09:40:49 +00:00
|
|
|
BufferBase::set(nullptr, 0, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
|
|
|
|
{
|
2020-07-28 08:22:45 +00:00
|
|
|
std::atomic<bool> bindings_created = false, bindings_error = false;
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-07-23 11:45:01 +00:00
|
|
|
auto success_callback = [&](const std::string & queue_name_, int msgcount, int /* consumercount */)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
|
|
|
queues.emplace_back(queue_name_);
|
2020-07-20 06:21:18 +00:00
|
|
|
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-07-23 11:45:01 +00:00
|
|
|
if (msgcount)
|
|
|
|
LOG_TRACE(log, "Queue " + queue_name_ + " is non-empty. Non-consumed messaged will also be delivered.");
|
|
|
|
|
2020-07-20 06:21:18 +00:00
|
|
|
subscribed_queue[queue_name_] = false;
|
|
|
|
subscribe(queues.back());
|
|
|
|
|
|
|
|
if (hash_exchange)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-20 06:21:18 +00:00
|
|
|
String binding_key;
|
2020-05-20 09:40:49 +00:00
|
|
|
if (queues.size() == 1)
|
2020-05-26 17:34:57 +00:00
|
|
|
binding_key = std::to_string(channel_id);
|
2020-05-20 09:40:49 +00:00
|
|
|
else
|
2020-05-26 17:34:57 +00:00
|
|
|
binding_key = std::to_string(channel_id + queue_id);
|
2020-07-21 15:47:39 +00:00
|
|
|
|
2020-07-20 06:21:18 +00:00
|
|
|
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
|
|
|
|
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
|
|
|
|
*/
|
2020-07-21 15:47:39 +00:00
|
|
|
String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_exchange;
|
2020-07-20 06:21:18 +00:00
|
|
|
|
2020-07-21 15:47:39 +00:00
|
|
|
setup_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
|
2020-07-20 06:21:18 +00:00
|
|
|
.onSuccess([&]
|
|
|
|
{
|
|
|
|
bindings_created = true;
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
bindings_error = true;
|
|
|
|
LOG_ERROR(log, "Failed to create queue binding. Reason: {}", message);
|
|
|
|
});
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
2020-07-20 06:21:18 +00:00
|
|
|
else if (exchange_type == AMQP::ExchangeType::fanout)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-21 15:47:39 +00:00
|
|
|
setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0])
|
2020-07-20 06:21:18 +00:00
|
|
|
.onSuccess([&]
|
|
|
|
{
|
|
|
|
bindings_created = true;
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
bindings_error = true;
|
|
|
|
LOG_ERROR(log, "Failed to bind to key. Reason: {}", message);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
else if (exchange_type == AMQP::ExchangeType::headers)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-20 06:21:18 +00:00
|
|
|
AMQP::Table binding_arguments;
|
|
|
|
std::vector<String> matching;
|
2020-06-10 23:01:47 +00:00
|
|
|
|
2020-07-20 06:21:18 +00:00
|
|
|
for (const auto & header : routing_keys)
|
2020-06-10 23:01:47 +00:00
|
|
|
{
|
2020-07-20 06:21:18 +00:00
|
|
|
boost::split(matching, header, [](char c){ return c == '='; });
|
|
|
|
binding_arguments[matching[0]] = matching[1];
|
|
|
|
matching.clear();
|
2020-06-11 09:23:23 +00:00
|
|
|
}
|
2020-06-13 21:37:37 +00:00
|
|
|
|
2020-07-21 15:47:39 +00:00
|
|
|
setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
|
2020-07-20 06:21:18 +00:00
|
|
|
.onSuccess([&]
|
|
|
|
{
|
|
|
|
bindings_created = true;
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
bindings_error = true;
|
|
|
|
LOG_ERROR(log, "Failed to bind queue. Reason: {}", message);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
|
|
|
|
for (const auto & routing_key : routing_keys)
|
|
|
|
{
|
|
|
|
/// Binding directly to exchange, specified by the client.
|
2020-07-21 15:47:39 +00:00
|
|
|
setup_channel->bindQueue(exchange_name, queue_name_, routing_key)
|
2020-06-13 21:37:37 +00:00
|
|
|
.onSuccess([&]
|
|
|
|
{
|
|
|
|
bindings_created = true;
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
|
|
|
bindings_error = true;
|
2020-07-20 06:21:18 +00:00
|
|
|
LOG_ERROR(log, "Failed to bind queue. Reason: {}", message);
|
2020-06-13 21:37:37 +00:00
|
|
|
});
|
|
|
|
}
|
2020-06-10 23:01:47 +00:00
|
|
|
}
|
2020-07-23 11:45:01 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
auto error_callback([&](const char * message)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-20 06:21:18 +00:00
|
|
|
bindings_error = true;
|
2020-06-14 16:26:37 +00:00
|
|
|
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
|
2020-05-20 09:40:49 +00:00
|
|
|
});
|
|
|
|
|
2020-07-24 12:33:07 +00:00
|
|
|
AMQP::Table queue_settings;
|
|
|
|
if (!deadletter_exchange.empty())
|
|
|
|
{
|
|
|
|
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
|
|
|
|
}
|
|
|
|
|
2020-07-23 11:45:01 +00:00
|
|
|
if (!queue_base.empty())
|
|
|
|
{
|
|
|
|
const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id);
|
2020-07-24 12:33:07 +00:00
|
|
|
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
2020-07-23 11:45:01 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2020-07-24 12:33:07 +00:00
|
|
|
setup_channel->declareQueue(AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
2020-07-23 11:45:01 +00:00
|
|
|
}
|
|
|
|
|
2020-07-20 06:21:18 +00:00
|
|
|
while (!bindings_created && !bindings_error)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-02 16:44:04 +00:00
|
|
|
iterateEventLoop();
|
2020-06-08 01:11:48 +00:00
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
|
|
|
|
{
|
2020-06-08 01:11:48 +00:00
|
|
|
if (subscribed_queue[queue_name])
|
|
|
|
return;
|
|
|
|
|
2020-07-24 12:33:07 +00:00
|
|
|
consumer_channel->consume(queue_name)
|
2020-07-20 10:05:00 +00:00
|
|
|
.onSuccess([&](const std::string & consumer)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-06-09 21:52:06 +00:00
|
|
|
subscribed_queue[queue_name] = true;
|
2020-06-08 01:11:48 +00:00
|
|
|
++count_subscribed;
|
2020-07-24 12:33:07 +00:00
|
|
|
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
|
|
|
|
|
|
|
|
consumer_error = false;
|
2020-07-20 10:05:00 +00:00
|
|
|
consumer_tag = consumer;
|
2020-06-08 01:11:48 +00:00
|
|
|
|
2020-07-24 12:33:07 +00:00
|
|
|
consumer_channel->onError([&](const char * message)
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message);
|
|
|
|
});
|
2020-05-20 09:40:49 +00:00
|
|
|
})
|
2020-07-20 10:05:00 +00:00
|
|
|
.onReceived([&](const AMQP::Message & message, uint64_t deliveryTag, bool redelivered)
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
|
|
|
size_t message_size = message.bodySize();
|
|
|
|
if (message_size && message.body() != nullptr)
|
|
|
|
{
|
|
|
|
String message_received = std::string(message.body(), message.body() + message_size);
|
|
|
|
if (row_delimiter != '\0')
|
|
|
|
message_received += row_delimiter;
|
2020-06-05 13:42:11 +00:00
|
|
|
|
2020-07-20 10:05:00 +00:00
|
|
|
received.push({deliveryTag, message_received, redelivered});
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.onError([&](const char * message)
|
|
|
|
{
|
2020-06-09 21:52:06 +00:00
|
|
|
consumer_error = true;
|
2020-06-14 16:26:37 +00:00
|
|
|
LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message);
|
2020-05-20 09:40:49 +00:00
|
|
|
});
|
2020-06-09 21:52:06 +00:00
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-08 01:11:48 +00:00
|
|
|
|
2020-06-09 21:52:06 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::checkSubscription()
|
|
|
|
{
|
2020-07-28 08:22:45 +00:00
|
|
|
if (count_subscribed == num_queues || !consumer_channel->usable())
|
2020-06-09 21:52:06 +00:00
|
|
|
return;
|
2020-06-08 01:11:48 +00:00
|
|
|
|
2020-06-09 21:52:06 +00:00
|
|
|
wait_subscribed = num_queues;
|
2020-06-08 01:11:48 +00:00
|
|
|
|
2020-06-14 16:26:37 +00:00
|
|
|
/// These variables are updated in a separate thread.
|
2020-06-09 21:52:06 +00:00
|
|
|
while (count_subscribed != wait_subscribed && !consumer_error)
|
|
|
|
{
|
2020-07-02 16:44:04 +00:00
|
|
|
iterateEventLoop();
|
2020-06-08 01:11:48 +00:00
|
|
|
}
|
|
|
|
|
2020-06-09 21:52:06 +00:00
|
|
|
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
|
|
|
|
|
2020-06-14 16:26:37 +00:00
|
|
|
/// Updated in callbacks which are run by the loop.
|
2020-06-11 20:05:35 +00:00
|
|
|
if (count_subscribed == num_queues)
|
|
|
|
return;
|
|
|
|
|
2020-06-14 16:26:37 +00:00
|
|
|
/// A case that should never normally happen.
|
2020-06-09 21:52:06 +00:00
|
|
|
for (auto & queue : queues)
|
2020-06-08 01:11:48 +00:00
|
|
|
{
|
2020-06-09 21:52:06 +00:00
|
|
|
subscribe(queue);
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-07-28 08:22:45 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::ackMessages()
|
2020-07-24 12:33:07 +00:00
|
|
|
{
|
2020-07-28 08:22:45 +00:00
|
|
|
UInt64 delivery_tag = last_inserted_delivery_tag;
|
|
|
|
if (delivery_tag && delivery_tag > prev_tag)
|
2020-07-24 12:33:07 +00:00
|
|
|
{
|
2020-07-28 08:22:45 +00:00
|
|
|
prev_tag = delivery_tag;
|
|
|
|
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
|
|
|
|
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag);
|
2020-07-24 12:33:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-07-02 16:44:04 +00:00
|
|
|
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
|
2020-06-04 06:22:53 +00:00
|
|
|
{
|
2020-07-02 16:44:04 +00:00
|
|
|
event_handler->iterateLoop();
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool ReadBufferFromRabbitMQConsumer::nextImpl()
|
|
|
|
{
|
|
|
|
if (stopped || !allowed)
|
|
|
|
return false;
|
|
|
|
|
2020-07-20 10:05:00 +00:00
|
|
|
if (received.tryPop(current))
|
2020-05-20 09:40:49 +00:00
|
|
|
{
|
2020-07-20 10:05:00 +00:00
|
|
|
auto * new_position = const_cast<char *>(current.message.data());
|
|
|
|
BufferBase::set(new_position, current.message.size(), 0);
|
2020-06-29 06:33:53 +00:00
|
|
|
allowed = false;
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-29 06:33:53 +00:00
|
|
|
return true;
|
|
|
|
}
|
2020-05-20 09:40:49 +00:00
|
|
|
|
2020-06-29 06:33:53 +00:00
|
|
|
return false;
|
2020-05-20 09:40:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|