Add connection restore in insert, better confirms

This commit is contained in:
kssenii 2020-07-31 04:59:56 +00:00
parent 763c337be9
commit 5a934c079e
2 changed files with 104 additions and 37 deletions

View File

@ -25,7 +25,7 @@ static const auto LOOP_WAIT = 10;
static const auto BATCH = 10000; static const auto BATCH = 10000;
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address, std::pair<String, UInt16> & parsed_address_,
Context & global_context, Context & global_context,
const std::pair<String, String> & login_password_, const std::pair<String, String> & login_password_,
const Names & routing_keys_, const Names & routing_keys_,
@ -39,6 +39,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
size_t rows_per_message, size_t rows_per_message,
size_t chunk_size_) size_t chunk_size_)
: WriteBuffer(nullptr, 0) : WriteBuffer(nullptr, 0)
, parsed_address(parsed_address_)
, login_password(login_password_) , login_password(login_password_)
, routing_keys(routing_keys_) , routing_keys(routing_keys_)
, exchange_name(exchange_name_) , exchange_name(exchange_name_)
@ -55,11 +56,45 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
loop = std::make_unique<uv_loop_t>(); loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get()); uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log); event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
/// New coonection for each publisher because cannot publish from different threads with the same connection.(https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086)
setupConnection(0);
setupChannel(0);
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
writing_task->deactivate();
if (exchange_type == AMQP::ExchangeType::headers)
{
for (const auto & header : routing_keys)
{
std::vector<String> matching;
boost::split(matching, header, [](char c){ return c == '='; });
key_arguments[matching[0]] = matching[1];
}
}
}
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
writing_task->deactivate();
connection->close();
assert(rows == 0 && chunks.empty());
}
void WriteBufferToRabbitMQProducer::setupConnection(bool remove_prev_connection)
{
if (remove_prev_connection && connection)
{
connection->close();
connection.release();
}
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/")); connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
/// New coonection for each publisher because cannot publish from different threads.(https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086)
size_t cnt_retries = 0; size_t cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX) while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{ {
@ -71,8 +106,18 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
{ {
throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ); throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
} }
}
producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
void WriteBufferToRabbitMQProducer::setupChannel(bool remove_prev_channel)
{
if (remove_prev_channel && producer_channel)
{
producer_channel->close();
producer_channel.release();
}
producer_channel = std::make_unique<AMQP::TcpChannel>(connection.get());
producer_channel->onError([&](const char * message) producer_channel->onError([&](const char * message)
{ {
LOG_ERROR(log, "Prodcuer error: {}", message); LOG_ERROR(log, "Prodcuer error: {}", message);
@ -84,38 +129,38 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
} }
else else
{ {
/// Same as here https://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
remove_confirmed_tag = [&](uint64_t received_delivery_tag, bool multiple)
{
std::lock_guard lock(mutex);
auto found_tag_pos = delivery_tags_record.find(received_delivery_tag);
if (found_tag_pos != delivery_tags_record.end())
{
if (multiple)
{
++found_tag_pos;
delivery_tags_record.erase(delivery_tags_record.begin(), found_tag_pos);
}
else
delivery_tags_record.erase(found_tag_pos);
}
};
/* if persistent == true, onAck is received when message is persisted to disk or when it is consumed on every queue. If fails, it
* will be requed in returned_callback. If persistent == false, message is confirmed the moment it is enqueued. If fails, it is
* not requeued. First option is two times slower than the second, so default is second and the first is turned on in table setting.
*/
producer_channel->confirmSelect() producer_channel->confirmSelect()
.onAck([&](uint64_t deliveryTag, bool /* multiple */) .onAck([&](uint64_t acked_delivery_tag, bool multiple)
{ {
if (deliveryTag > last_processed) remove_confirmed_tag(acked_delivery_tag, multiple);
last_processed = deliveryTag;
}) })
.onNack([&](uint64_t /* deliveryTag */, bool /* multiple */, bool /* requeue */) .onNack([&](uint64_t nacked_delivery_tag, bool multiple, bool /* requeue */)
{ {
if (!persistent)
remove_confirmed_tag(nacked_delivery_tag, multiple);
}); });
} }
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
writing_task->deactivate();
if (exchange_type == AMQP::ExchangeType::headers)
{
std::vector<String> matching;
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
key_arguments[matching[0]] = matching[1];
matching.clear();
}
}
}
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
writing_task->deactivate();
connection->close();
assert(rows == 0 && chunks.empty());
} }
@ -143,6 +188,9 @@ void WriteBufferToRabbitMQProducer::countRow()
++delivery_tag; ++delivery_tag;
payloads.push(payload); payloads.push(payload);
std::lock_guard lock(mutex);
delivery_tags_record.insert(delivery_tags_record.end(), delivery_tag);
} }
} }
@ -180,7 +228,7 @@ void WriteBufferToRabbitMQProducer::writingFunc()
else if (exchange_type == AMQP::ExchangeType::headers) else if (exchange_type == AMQP::ExchangeType::headers)
{ {
envelope.setHeaders(key_arguments); envelope.setHeaders(key_arguments);
producer_channel->publish(exchange_name, "", envelope, key_arguments).onReturned(returned_callback); producer_channel->publish(exchange_name, "", envelope).onReturned(returned_callback);
} }
else else
{ {
@ -191,7 +239,7 @@ void WriteBufferToRabbitMQProducer::writingFunc()
iterateEventLoop(); iterateEventLoop();
} }
if (wait_num.load() && last_processed.load() >= wait_num.load()) if (wait_num.load() && delivery_tags_record.empty())
{ {
wait_all.store(false); wait_all.store(false);
LOG_DEBUG(log, "All messages are successfully published"); LOG_DEBUG(log, "All messages are successfully published");
@ -200,7 +248,22 @@ void WriteBufferToRabbitMQProducer::writingFunc()
{ {
iterateEventLoop(); iterateEventLoop();
} }
/// Most channel based errors result in channel closure, which is very likely to trigger connection closure.
if (connection->usable() && connection->ready() && !producer_channel->usable())
{
LOG_DEBUG(log, "Channel is not usable. Creating a new one");
setupChannel(1);
} }
else if (!connection->usable() || !connection->ready())
{
LOG_DEBUG(log, "Connection is not usable. Creating a new one");
setupConnection(1);
setupChannel(1);
}
}
LOG_DEBUG(log, "Delivered messages");
} }

View File

@ -14,13 +14,11 @@
namespace DB namespace DB
{ {
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
class WriteBufferToRabbitMQProducer : public WriteBuffer class WriteBufferToRabbitMQProducer : public WriteBuffer
{ {
public: public:
WriteBufferToRabbitMQProducer( WriteBufferToRabbitMQProducer(
std::pair<String, UInt16> & parsed_address, std::pair<String, UInt16> & parsed_address_,
Context & global_context, Context & global_context,
const std::pair<String, String> & login_password_, const std::pair<String, String> & login_password_,
const Names & routing_keys_, const Names & routing_keys_,
@ -46,7 +44,10 @@ private:
void nextImpl() override; void nextImpl() override;
void iterateEventLoop(); void iterateEventLoop();
void writingFunc(); void writingFunc();
void setupConnection(bool remove_prev_connection);
void setupChannel(bool remove_prev_channel);
std::pair<String, UInt16> parsed_address;
const std::pair<String, String> login_password; const std::pair<String, String> login_password;
const Names routing_keys; const Names routing_keys;
const String exchange_name; const String exchange_name;
@ -61,12 +62,15 @@ private:
std::unique_ptr<uv_loop_t> loop; std::unique_ptr<uv_loop_t> loop;
std::unique_ptr<RabbitMQHandler> event_handler; std::unique_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection; std::unique_ptr<AMQP::TcpConnection> connection;
ChannelPtr producer_channel; std::unique_ptr<AMQP::TcpChannel> producer_channel;
ConcurrentBoundedQueue<String> payloads; ConcurrentBoundedQueue<String> payloads;
UInt64 delivery_tag = 0; UInt64 delivery_tag = 0;
std::atomic<bool> wait_all = true; std::atomic<bool> wait_all = true;
std::atomic<UInt64> wait_num = 0, last_processed = 0; std::atomic<UInt64> wait_num = 0;
std::set<UInt64> delivery_tags_record;
std::mutex mutex;
std::function<void(uint64_t received_delivery_tag, bool multiple)> remove_confirmed_tag;
Poco::Logger * log; Poco::Logger * log;
const std::optional<char> delim; const std::optional<char> delim;