ClickHouse/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp

389 lines
13 KiB
C++
Raw Normal View History

#include <utility>
2020-05-26 17:34:57 +00:00
#include <chrono>
#include <thread>
2020-05-31 08:39:22 +00:00
#include <mutex>
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <common/logger_useful.h>
2020-06-07 11:14:05 +00:00
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
2020-06-08 01:11:48 +00:00
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & eventHandler_,
const String & exchange_name_,
2020-06-11 09:23:23 +00:00
const Names & routing_keys_,
const size_t channel_id_,
Poco::Logger * log_,
char row_delimiter_,
const bool bind_by_id_,
const size_t num_queues_,
2020-06-10 23:01:47 +00:00
const String & exchange_type_,
const String table_name_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, eventHandler(eventHandler_)
, exchange_name(exchange_name_)
2020-06-11 09:23:23 +00:00
, routing_keys(routing_keys_)
, channel_id(channel_id_)
, log(log_)
, row_delimiter(row_delimiter_)
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
2020-06-10 23:01:47 +00:00
, exchange_type(exchange_type_)
, table_name(table_name_)
, stopped(stopped_)
{
messages.clear();
current = messages.begin();
2020-06-11 09:23:23 +00:00
exchange_type_set = exchange_type != "default" ? true : false;
2020-06-10 23:01:47 +00:00
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
*/
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
{
2020-06-10 23:01:47 +00:00
/// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix()
initQueueBindings(queue_id);
}
}
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
consumer_channel->close();
messages.clear();
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
}
void ReadBufferFromRabbitMQConsumer::initExchange()
{
2020-06-11 09:23:23 +00:00
/* If exchange_type is not set - then direct-exchange is used - this type of exchange is the fastest (also due to different
* binding algorithm this default behaviuor is much faster). It is also used in INSERT query.
*/
2020-06-10 23:01:47 +00:00
String producer_exchange = exchange_type_set ? exchange_name + "_default" : exchange_name;
consumer_channel->declareExchange(producer_exchange, AMQP::fanout).onError([&](const char * message)
{
2020-06-10 23:01:47 +00:00
internal_exchange_declared = false;
LOG_ERROR(log, "Failed to declare exchange: {}", message);
});
2020-06-10 23:01:47 +00:00
internal_exchange_name = producer_exchange + "_direct";
consumer_channel->declareExchange(internal_exchange_name, AMQP::direct).onError([&](const char * message)
{
2020-06-10 23:01:47 +00:00
internal_exchange_declared = false;
LOG_ERROR(log, "Failed to declare exchange: {}", message);
});
2020-06-11 09:23:23 +00:00
/// With fanout exchange the binding key is ignored - a parameter might be arbitrary
consumer_channel->bindExchange(producer_exchange, internal_exchange_name, routing_keys[0]).onError([&](const char * message)
{
2020-06-10 23:01:47 +00:00
internal_exchange_declared = false;
LOG_ERROR(log, "Failed to bind exchange: {}", message);
});
2020-06-10 23:01:47 +00:00
if (!exchange_type_set)
return;
2020-06-11 09:23:23 +00:00
/// For special purposes to use the flexibility of routing provided by rabbitmq - choosing exchange types is supported.
2020-06-10 23:01:47 +00:00
AMQP::ExchangeType type;
if (exchange_type == "fanout") type = AMQP::ExchangeType::fanout;
else if (exchange_type == "direct") type = AMQP::ExchangeType::direct;
else if (exchange_type == "topic") type = AMQP::ExchangeType::topic;
else if (exchange_type == "consistent_hash") type = AMQP::ExchangeType::consistent_hash;
else return;
/* Declare exchange of the specified type and bind it to hash-exchange, which will evenly distribute messages
* between all consumers. (This enables better scaling as without hash-echange - the only oprion to avoid getting the same
* messages more than once - is having only one consumer with one queue, which is not good.)
*/
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
hash_exchange = true;
/// No need for declaring hash-exchange if there is only one consumer with one queue and exchange type is not hash
if (!bind_by_id && exchange_type != "consistent_hash")
return;
AMQP::Table exchange_arguments;
exchange_arguments["hash-property"] = "message_id";
local_exchange_name = exchange_name + "_" + table_name;
consumer_channel->declareExchange(local_exchange_name, AMQP::consistent_hash, exchange_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
2020-06-11 09:23:23 +00:00
for (auto & routing_key : routing_keys)
2020-06-10 23:01:47 +00:00
{
2020-06-11 09:23:23 +00:00
consumer_channel->bindExchange(exchange_name, local_exchange_name, routing_key).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind {} exchange to {} exchange: {}", local_exchange_name, exchange_name, message);
});
}
}
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
2020-06-10 23:01:47 +00:00
/// These variables might be updated later from a separate thread in onError callbacks
if (!internal_exchange_declared || (exchange_type_set && !local_exchange_declared))
{
initExchange();
2020-06-10 23:01:47 +00:00
local_exchange_declared = true;
internal_exchange_declared = true;
}
2020-06-10 23:01:47 +00:00
bool internal_bindings_created = false, internal_bindings_error = false;
bool local_bindings_created = false, local_bindings_error = false;
2020-06-07 11:14:05 +00:00
consumer_channel->declareQueue(AMQP::exclusive)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
2020-06-08 01:11:48 +00:00
subscribed_queue[queue_name_] = false;
2020-06-11 09:23:23 +00:00
String binding_key = routing_keys[0];
2020-05-26 17:34:57 +00:00
/* Every consumer has at least one unique queue. Bind the queues to exchange based on the consumer_channel_id
* in case there is one queue per consumer and bind by queue_id in case there is more than 1 queue per consumer.
* (queue_id is based on channel_id)
*/
if (bind_by_id || hash_exchange)
{
if (queues.size() == 1)
{
2020-05-26 17:34:57 +00:00
binding_key = std::to_string(channel_id);
}
else
{
2020-05-26 17:34:57 +00:00
binding_key = std::to_string(channel_id + queue_id);
}
}
2020-06-10 23:01:47 +00:00
consumer_channel->bindQueue(internal_exchange_name, queue_name_, binding_key)
.onSuccess([&]
{
2020-06-10 23:01:47 +00:00
internal_bindings_created = true;
})
.onError([&](const char * message)
{
2020-06-10 23:01:47 +00:00
internal_bindings_error = true;
2020-06-11 09:23:23 +00:00
LOG_ERROR(log, "Failed to bind to key {}, the reason is: {}", binding_key, message);
});
2020-06-10 23:01:47 +00:00
2020-06-11 09:23:23 +00:00
/// Must be done here and not in readPrefix() because library might fail to handle async subscription on the same connection
subscribe(queues.back());
LOG_TRACE(log, "Queue " + queue_name_ + " is bound by key " + binding_key);
2020-06-10 23:01:47 +00:00
if (exchange_type_set)
{
2020-06-11 09:23:23 +00:00
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary
if (hash_exchange)
2020-06-10 23:01:47 +00:00
{
2020-06-11 09:23:23 +00:00
consumer_channel->bindQueue(local_exchange_name, queue_name_, binding_key)
.onSuccess([&]
{
local_bindings_created = true;
})
.onError([&](const char * message)
{
local_bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding: {}", message);
});
}
else
2020-06-10 23:01:47 +00:00
{
2020-06-11 09:23:23 +00:00
/// means there is only one queue with one consumer - no even distribution needed - no hash-exchange
for (auto & routing_key : routing_keys)
{
consumer_channel->bindQueue(local_exchange_name, queue_name_, routing_key)
.onSuccess([&]
{
local_bindings_created = true;
})
.onError([&](const char * message)
{
local_bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding: {}", message);
});
}
}
2020-06-10 23:01:47 +00:00
}
})
.onError([&](const char * message)
{
2020-06-10 23:01:47 +00:00
internal_bindings_error = true;
2020-05-26 20:43:20 +00:00
LOG_ERROR(log, "Failed to declare queue on the channel: {}", message);
});
2020-05-26 17:34:57 +00:00
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
*/
2020-06-10 23:01:47 +00:00
while (!internal_bindings_created && !internal_bindings_error
|| (exchange_type_set && !local_bindings_created && !local_bindings_error))
{
2020-06-09 21:52:06 +00:00
startEventLoop(loop_started);
2020-06-08 01:11:48 +00:00
}
}
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
{
2020-06-08 01:11:48 +00:00
if (subscribed_queue[queue_name])
return;
consumer_channel->consume(queue_name, AMQP::noack)
2020-05-26 17:34:57 +00:00
.onSuccess([&](const std::string & /* consumer */)
{
2020-06-09 21:52:06 +00:00
subscribed_queue[queue_name] = true;
2020-06-08 01:11:48 +00:00
++count_subscribed;
2020-06-07 11:14:05 +00:00
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
{
size_t message_size = message.bodySize();
if (message_size && message.body() != nullptr)
{
String message_received = std::string(message.body(), message.body() + message_size);
if (row_delimiter != '\0')
2020-06-07 11:14:05 +00:00
{
message_received += row_delimiter;
2020-06-07 11:14:05 +00:00
}
2020-06-05 13:42:11 +00:00
2020-06-04 06:22:53 +00:00
bool stop_loop = false;
2020-06-09 21:52:06 +00:00
/// Needed to avoid data race because this vector can be used at the same time by another thread in nextImpl().
2020-06-04 06:22:53 +00:00
{
std::lock_guard lock(mutex);
received.push_back(message_received);
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
2020-06-07 11:14:05 +00:00
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
2020-06-04 06:22:53 +00:00
*/
2020-06-07 11:14:05 +00:00
if (!loop_started)
2020-06-04 06:22:53 +00:00
{
stop_loop = true;
}
}
if (stop_loop)
{
2020-06-07 11:14:05 +00:00
stopEventLoopWithTimeout();
2020-06-04 06:22:53 +00:00
}
}
})
.onError([&](const char * message)
{
2020-06-09 21:52:06 +00:00
consumer_error = true;
2020-06-04 06:22:53 +00:00
LOG_ERROR(log, "Consumer {} failed: {}", channel_id, message);
});
2020-06-09 21:52:06 +00:00
}
2020-06-08 01:11:48 +00:00
2020-06-09 21:52:06 +00:00
void ReadBufferFromRabbitMQConsumer::checkSubscription()
{
/// In general this condition will always be true and looping/resubscribing would not happen
if (count_subscribed == num_queues)
return;
2020-06-08 01:11:48 +00:00
2020-06-09 21:52:06 +00:00
wait_subscribed = num_queues;
2020-06-08 01:11:48 +00:00
2020-06-09 21:52:06 +00:00
/// These variables are updated in a separate thread
while (count_subscribed != wait_subscribed && !consumer_error)
{
startEventLoop(loop_started);
2020-06-08 01:11:48 +00:00
}
2020-06-09 21:52:06 +00:00
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
/// A case that would not normally happen
for (auto & queue : queues)
2020-06-08 01:11:48 +00:00
{
2020-06-09 21:52:06 +00:00
subscribe(queue);
}
}
2020-06-04 06:22:53 +00:00
void ReadBufferFromRabbitMQConsumer::stopEventLoop()
{
eventHandler.stop();
}
2020-06-07 11:14:05 +00:00
void ReadBufferFromRabbitMQConsumer::stopEventLoopWithTimeout()
{
eventHandler.stopWithTimeout();
}
2020-06-09 21:52:06 +00:00
void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic<bool> & loop_started)
{
2020-06-09 21:52:06 +00:00
eventHandler.startConsumerLoop(loop_started);
}
bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
if (stopped || !allowed)
return false;
if (current == messages.end())
{
if (received.empty())
{
2020-06-07 11:14:05 +00:00
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread
2020-06-09 21:52:06 +00:00
startEventLoop(loop_started);
2020-06-07 11:14:05 +00:00
loop_started = false;
}
if (received.empty())
{
2020-05-26 17:34:57 +00:00
LOG_TRACE(log, "No more messages to be fetched");
return false;
}
messages.clear();
2020-06-09 21:52:06 +00:00
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback.
std::lock_guard lock(mutex);
messages.swap(received);
current = messages.begin();
}
2020-05-26 17:34:57 +00:00
auto * new_position = const_cast<char *>(current->data());
BufferBase::set(new_position, current->size(), 0);
++current;
allowed = false;
return true;
}
}