ClickHouse/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp

362 lines
12 KiB
C++
Raw Normal View History

#include <utility>
2020-05-26 17:34:57 +00:00
#include <chrono>
#include <thread>
2020-05-31 08:39:22 +00:00
#include <mutex>
#include <atomic>
#include <memory>
#include <Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
2020-06-13 21:37:37 +00:00
#include <boost/algorithm/string/split.hpp>
#include <common/logger_useful.h>
2020-06-07 11:14:05 +00:00
#include "Poco/Timer.h"
#include <amqpcpp.h>
namespace DB
{
namespace ExchangeType
{
static const String HASH_SUF = "_hash";
}
2020-06-29 15:41:17 +00:00
static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
2020-06-30 01:48:11 +00:00
HandlerPtr event_handler_,
const String & exchange_name_,
const AMQP::ExchangeType & exchange_type_,
2020-06-11 09:23:23 +00:00
const Names & routing_keys_,
2020-07-02 16:44:04 +00:00
size_t channel_id_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
2020-07-02 16:44:04 +00:00
size_t num_queues_,
const String & local_exchange_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
2020-06-30 01:48:11 +00:00
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, exchange_type(exchange_type_)
2020-06-11 09:23:23 +00:00
, routing_keys(routing_keys_)
, channel_id(channel_id_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, local_exchange(local_exchange_)
, local_hash_exchange(local_exchange + ExchangeType::HASH_SUF)
2020-07-13 01:11:35 +00:00
, log(log_)
, row_delimiter(row_delimiter_)
, stopped(stopped_)
2020-06-29 15:41:17 +00:00
, messages(QUEUE_SIZE * num_queues)
{
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
*/
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
{
2020-06-10 23:01:47 +00:00
/// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix()
initQueueBindings(queue_id);
}
}
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
consumer_channel->close();
messages.clear();
BufferBase::set(nullptr, 0, 0);
}
void ReadBufferFromRabbitMQConsumer::initExchange()
{
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers.
2020-06-10 23:01:47 +00:00
*/
consumer_channel->declareExchange(exchange_name, exchange_type).onError([&](const char * message)
2020-06-10 23:01:47 +00:00
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message);
2020-06-10 23:01:47 +00:00
});
/// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash
if (!hash_exchange || exchange_type == AMQP::ExchangeType::consistent_hash)
return;
2020-06-10 23:01:47 +00:00
{
/* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
*/
2020-07-13 01:11:35 +00:00
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
}
2020-06-10 23:01:47 +00:00
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
if (exchange_type == AMQP::ExchangeType::headers)
2020-06-10 23:01:47 +00:00
{
2020-06-13 21:37:37 +00:00
AMQP::Table binding_arguments;
std::vector<String> matching;
for (const auto & header : routing_keys)
2020-06-13 21:37:37 +00:00
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
matching.clear();
}
/// Routing key can be arbitrary here.
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments)
2020-06-13 21:37:37 +00:00
.onError([&](const char * message)
2020-06-11 09:23:23 +00:00
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
2020-06-11 09:23:23 +00:00
});
}
else if (exchange_type == AMQP::ExchangeType::fanout)
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0]).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
2020-06-13 21:37:37 +00:00
else
{
for (const auto & routing_key : routing_keys)
2020-06-13 21:37:37 +00:00
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message)
2020-06-13 21:37:37 +00:00
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
2020-06-13 21:37:37 +00:00
});
}
}
}
void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
{
2020-06-11 20:05:35 +00:00
/// These variables might be updated later from a separate thread in onError callbacks.
if (!local_exchange_declared || (hash_exchange && !local_hash_exchange_declared))
{
initExchange();
2020-06-10 23:01:47 +00:00
local_exchange_declared = true;
local_hash_exchange_declared = true;
}
bool bindings_created = false, bindings_error = false;
2020-06-07 11:14:05 +00:00
consumer_channel->declareQueue(AMQP::exclusive)
.onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */)
{
queues.emplace_back(queue_name_);
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
subscribed_queue[queue_name_] = false;
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise
* consumer might fail to subscribe and no resubscription will help.
2020-05-26 17:34:57 +00:00
*/
subscribe(queues.back());
if (hash_exchange)
{
String binding_key;
if (queues.size() == 1)
{
2020-05-26 17:34:57 +00:00
binding_key = std::to_string(channel_id);
}
else
{
2020-05-26 17:34:57 +00:00
binding_key = std::to_string(channel_id + queue_id);
}
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
*/
String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_hash_exchange;
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary.
consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding. Reason: {}", message);
});
}
else if (exchange_type == AMQP::ExchangeType::fanout)
{
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0])
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to bind to key. Reason: {}", message);
});
}
else if (exchange_type == AMQP::ExchangeType::headers)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
2020-06-10 23:01:47 +00:00
/// It is not parsed for the second time - if it was parsed above, then it would never end up here.
for (const auto & header : routing_keys)
2020-06-10 23:01:47 +00:00
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
matching.clear();
2020-06-11 09:23:23 +00:00
}
2020-06-13 21:37:37 +00:00
consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments)
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to bind queue. Reason: {}", message);
});
}
else
{
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
for (const auto & routing_key : routing_keys)
{
/// Binding directly to exchange, specified by the client.
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
2020-06-13 21:37:37 +00:00
.onSuccess([&]
{
bindings_created = true;
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to bind queue. Reason: {}", message);
2020-06-13 21:37:37 +00:00
});
}
2020-06-10 23:01:47 +00:00
}
})
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
});
2020-05-26 17:34:57 +00:00
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
* It is important at this moment to make sure that queue bindings are created before any publishing can happen because
* otherwise messages will be routed nowhere.
*/
while (!bindings_created && !bindings_error)
{
2020-07-02 16:44:04 +00:00
iterateEventLoop();
2020-06-08 01:11:48 +00:00
}
}
void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
{
2020-06-08 01:11:48 +00:00
if (subscribed_queue[queue_name])
return;
consumer_channel->consume(queue_name, AMQP::noack)
2020-05-26 17:34:57 +00:00
.onSuccess([&](const std::string & /* consumer */)
{
2020-06-09 21:52:06 +00:00
subscribed_queue[queue_name] = true;
2020-06-11 20:05:35 +00:00
consumer_error = false;
2020-06-08 01:11:48 +00:00
++count_subscribed;
2020-06-07 11:14:05 +00:00
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */)
{
size_t message_size = message.bodySize();
if (message_size && message.body() != nullptr)
{
String message_received = std::string(message.body(), message.body() + message_size);
if (row_delimiter != '\0')
2020-06-07 11:14:05 +00:00
{
message_received += row_delimiter;
2020-06-07 11:14:05 +00:00
}
2020-06-05 13:42:11 +00:00
messages.push(message_received);
}
})
.onError([&](const char * message)
{
2020-06-09 21:52:06 +00:00
consumer_error = true;
LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message);
});
2020-06-09 21:52:06 +00:00
}
2020-06-08 01:11:48 +00:00
2020-06-09 21:52:06 +00:00
void ReadBufferFromRabbitMQConsumer::checkSubscription()
{
if (count_subscribed == num_queues)
return;
2020-06-08 01:11:48 +00:00
2020-06-09 21:52:06 +00:00
wait_subscribed = num_queues;
2020-06-08 01:11:48 +00:00
/// These variables are updated in a separate thread.
2020-06-09 21:52:06 +00:00
while (count_subscribed != wait_subscribed && !consumer_error)
{
2020-07-02 16:44:04 +00:00
iterateEventLoop();
2020-06-08 01:11:48 +00:00
}
2020-06-09 21:52:06 +00:00
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
/// Updated in callbacks which are run by the loop.
2020-06-11 20:05:35 +00:00
if (count_subscribed == num_queues)
return;
/// A case that should never normally happen.
2020-06-09 21:52:06 +00:00
for (auto & queue : queues)
2020-06-08 01:11:48 +00:00
{
2020-06-09 21:52:06 +00:00
subscribe(queue);
}
}
2020-07-02 16:44:04 +00:00
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
2020-06-04 06:22:53 +00:00
{
2020-07-02 16:44:04 +00:00
event_handler->iterateLoop();
}
bool ReadBufferFromRabbitMQConsumer::nextImpl()
{
if (stopped || !allowed)
return false;
if (messages.tryPop(current))
{
auto * new_position = const_cast<char *>(current.data());
BufferBase::set(new_position, current.size(), 0);
allowed = false;
return true;
}
return false;
}
}