mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-26 19:50:51 +00:00
Rewrite StorageRabbitMQ a bit
This commit is contained in:
parent
24010689e6
commit
f26f64993a
88
src/Storages/RabbitMQ/RabbitMQConnection.cpp
Normal file
88
src/Storages/RabbitMQ/RabbitMQConnection.cpp
Normal file
@ -0,0 +1,88 @@
|
||||
#include "RabbitMQConnection.h"
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static const auto CONNECT_SLEEP = 200;
|
||||
static const auto RETRIES_MAX = 20;
|
||||
|
||||
|
||||
RabbitMQConnection::RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_)
|
||||
: configuration(configuration_)
|
||||
, log(log_)
|
||||
, event_handler(loop.getLoop(), log)
|
||||
{
|
||||
}
|
||||
|
||||
String RabbitMQConnection::connectionInfoForLog() const
|
||||
{
|
||||
return configuration.host + ':' + toString(configuration.port);
|
||||
}
|
||||
|
||||
bool RabbitMQConnection::isConnected()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return event_handler.connectionRunning() && connection->usable();
|
||||
}
|
||||
|
||||
bool RabbitMQConnection::connect()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (configuration.connection_string.empty())
|
||||
{
|
||||
AMQP::Login login(configuration.username, configuration.password);
|
||||
AMQP::Address address(configuration.host, configuration.port, login, configuration.vhost, configuration.secure);
|
||||
connection = std::make_unique<AMQP::TcpConnection>(&event_handler, address);
|
||||
}
|
||||
else
|
||||
{
|
||||
AMQP::Address address(configuration.connection_string);
|
||||
connection = std::make_unique<AMQP::TcpConnection>(&event_handler, address);
|
||||
}
|
||||
|
||||
auto cnt_retries = 0;
|
||||
while (!connection->ready() && cnt_retries++ != RETRIES_MAX)
|
||||
{
|
||||
event_handler.iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
return event_handler.connectionRunning();
|
||||
}
|
||||
|
||||
bool RabbitMQConnection::reconnect()
|
||||
{
|
||||
disconnect();
|
||||
{
|
||||
/// This will force immediate closure if not yet closed
|
||||
std::lock_guard lock(mutex);
|
||||
if (!connection->closed())
|
||||
connection->close(true);
|
||||
}
|
||||
LOG_TRACE(log, "Trying to restore connection to {}", connectionInfoForLog());
|
||||
return connect();
|
||||
}
|
||||
|
||||
ChannelPtr RabbitMQConnection::createChannel()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return std::make_unique<AMQP::TcpChannel>(connection.get());
|
||||
}
|
||||
|
||||
void RabbitMQConnection::disconnect(bool immediately)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
connection->close(immediately);
|
||||
|
||||
/** Connection is not closed immediately (firstly, all pending operations are completed, and then
|
||||
* an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed
|
||||
*/
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
event_handler.iterateLoop();
|
||||
}
|
||||
|
||||
}
|
59
src/Storages/RabbitMQ/RabbitMQConnection.h
Normal file
59
src/Storages/RabbitMQ/RabbitMQConnection.h
Normal file
@ -0,0 +1,59 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/RabbitMQ/UVLoop.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct RabbitMQConfiguration
|
||||
{
|
||||
String host;
|
||||
UInt16 port;
|
||||
String username;
|
||||
String password;
|
||||
String vhost;
|
||||
|
||||
bool secure;
|
||||
String connection_string;
|
||||
};
|
||||
|
||||
class RabbitMQConnection
|
||||
{
|
||||
public:
|
||||
RabbitMQConnection(const RabbitMQConfiguration & configuration_, Poco::Logger * log_);
|
||||
|
||||
bool isConnected();
|
||||
|
||||
bool connect();
|
||||
|
||||
bool reconnect();
|
||||
|
||||
void disconnect(bool immediately = false);
|
||||
|
||||
void heartbeat() { connection->heartbeat(); }
|
||||
|
||||
bool closed() { return connection->closed(); }
|
||||
|
||||
ChannelPtr createChannel();
|
||||
|
||||
/// RabbitMQHandler is thread safe. Any public methods can be called concurrently.
|
||||
RabbitMQHandler & getHandler() { return event_handler; }
|
||||
|
||||
String connectionInfoForLog() const;
|
||||
|
||||
private:
|
||||
RabbitMQConfiguration configuration;
|
||||
Poco::Logger * log;
|
||||
|
||||
UVLoop loop;
|
||||
RabbitMQHandler event_handler;
|
||||
|
||||
std::unique_ptr<AMQP::TcpConnection> connection;
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
using RabbitMQConnectionPtr = std::unique_ptr<RabbitMQConnection>;
|
||||
|
||||
}
|
@ -32,13 +32,6 @@ void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
|
||||
loop_state.store(Loop::RUN);
|
||||
}
|
||||
|
||||
bool RabbitMQHandler::connectionRunning(const AMQP::TcpConnection * connection)
|
||||
{
|
||||
if (connection_running.load() && !connection->usable())
|
||||
LOG_ERROR(log, "Logical error: mismatch in connection flags");
|
||||
return connection_running.load() && connection->usable();
|
||||
}
|
||||
|
||||
void RabbitMQHandler::startLoop()
|
||||
{
|
||||
std::lock_guard lock(startup_mutex);
|
||||
|
@ -17,6 +17,7 @@ namespace Loop
|
||||
static const UInt8 STOP = 2;
|
||||
}
|
||||
|
||||
using ChannelPtr = std::unique_ptr<AMQP::TcpChannel>;
|
||||
|
||||
class RabbitMQHandler : public AMQP::LibUvHandler
|
||||
{
|
||||
@ -40,7 +41,7 @@ public:
|
||||
|
||||
void stopLoop();
|
||||
|
||||
bool connectionRunning(const AMQP::TcpConnection * connection);
|
||||
bool connectionRunning() { return connection_running.load(); }
|
||||
bool loopRunning() { return loop_running.load(); }
|
||||
|
||||
void updateLoopState(UInt8 state) { loop_state.store(state); }
|
||||
@ -55,4 +56,6 @@ private:
|
||||
std::mutex startup_mutex;
|
||||
};
|
||||
|
||||
using RabbitMQHandlerPtr = std::shared_ptr<RabbitMQHandler>;
|
||||
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ namespace DB
|
||||
|
||||
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
RabbitMQHandler & event_handler_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
@ -159,7 +159,7 @@ bool ReadBufferFromRabbitMQConsumer::needChannelUpdate()
|
||||
|
||||
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
event_handler.iterateLoop();
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,16 +15,13 @@ namespace Poco
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
|
||||
using HandlerPtr = std::shared_ptr<RabbitMQHandler>;
|
||||
|
||||
class ReadBufferFromRabbitMQConsumer : public ReadBuffer
|
||||
{
|
||||
|
||||
public:
|
||||
ReadBufferFromRabbitMQConsumer(
|
||||
ChannelPtr consumer_channel_,
|
||||
HandlerPtr event_handler_,
|
||||
RabbitMQHandler & event_handler_,
|
||||
std::vector<String> & queues_,
|
||||
size_t channel_id_base_,
|
||||
const String & channel_base_,
|
||||
@ -85,7 +82,7 @@ private:
|
||||
void iterateEventLoop();
|
||||
|
||||
ChannelPtr consumer_channel;
|
||||
HandlerPtr event_handler;
|
||||
RabbitMQHandler & event_handler; /// Used concurrently, but is thread safe.
|
||||
std::vector<String> queues;
|
||||
const String channel_base;
|
||||
const size_t channel_id_base;
|
||||
|
@ -37,8 +37,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static const auto CONNECT_SLEEP = 200;
|
||||
static const auto RETRIES_MAX = 20;
|
||||
static const uint32_t QUEUE_SIZE = 100000;
|
||||
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
|
||||
static const auto RESCHEDULE_MS = 500;
|
||||
@ -91,23 +89,28 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
, use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value)
|
||||
, hash_exchange(num_consumers > 1 || num_queues > 1)
|
||||
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
|
||||
, address(rabbitmq_settings->rabbitmq_host_port.value)
|
||||
, parsed_address(parseAddress(address, 5672))
|
||||
, login_password(std::make_pair(
|
||||
getContext()->getConfigRef().getString("rabbitmq.username"),
|
||||
getContext()->getConfigRef().getString("rabbitmq.password")))
|
||||
, vhost(getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value))
|
||||
, connection_string(rabbitmq_settings->rabbitmq_address)
|
||||
, secure(rabbitmq_settings->rabbitmq_secure.value)
|
||||
, semaphore(0, num_consumers)
|
||||
, unique_strbase(getRandomName())
|
||||
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
|
||||
, milliseconds_to_wait(RESCHEDULE_MS)
|
||||
{
|
||||
event_handler = std::make_shared<RabbitMQHandler>(loop.getLoop(), log);
|
||||
if (secure)
|
||||
auto parsed_address = parseAddress(rabbitmq_settings->rabbitmq_host_port.value, 5672);
|
||||
configuration =
|
||||
{
|
||||
.host = parsed_address.first,
|
||||
.port = parsed_address.second,
|
||||
.username = getContext()->getConfigRef().getString("rabbitmq.username"),
|
||||
.password = getContext()->getConfigRef().getString("rabbitmq.password"),
|
||||
.vhost = getContext()->getConfigRef().getString("rabbitmq.vhost", rabbitmq_settings->rabbitmq_vhost.value),
|
||||
.secure = rabbitmq_settings->rabbitmq_secure.value,
|
||||
.connection_string = rabbitmq_settings->rabbitmq_address
|
||||
};
|
||||
|
||||
if (configuration.secure)
|
||||
SSL_library_init();
|
||||
restoreConnection(false);
|
||||
|
||||
connection = std::make_unique<RabbitMQConnection>(configuration, log);
|
||||
connection->connect();
|
||||
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
@ -117,7 +120,6 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
rabbitmq_context->makeQueryContext();
|
||||
|
||||
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
looping_task = getContext()->getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
|
||||
looping_task->deactivate();
|
||||
|
||||
@ -222,14 +224,14 @@ std::shared_ptr<Context> StorageRabbitMQ::addSettings(ContextPtr local_context)
|
||||
|
||||
void StorageRabbitMQ::loopingFunc()
|
||||
{
|
||||
if (event_handler->connectionRunning(connection.get()))
|
||||
event_handler->startLoop();
|
||||
if (connection->isConnected())
|
||||
connection->getHandler().startLoop();
|
||||
}
|
||||
|
||||
|
||||
void StorageRabbitMQ::connectionFunc()
|
||||
{
|
||||
if (restoreConnection(true))
|
||||
if (connection->reconnect())
|
||||
initRabbitMQ();
|
||||
else
|
||||
connection_task->scheduleAfter(RESCHEDULE_MS);
|
||||
@ -242,7 +244,9 @@ void StorageRabbitMQ::connectionFunc()
|
||||
void StorageRabbitMQ::deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop)
|
||||
{
|
||||
if (stop_loop)
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
{
|
||||
connection->getHandler().updateLoopState(Loop::STOP);
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(task_mutex, std::defer_lock);
|
||||
if (lock.try_lock())
|
||||
@ -278,19 +282,19 @@ void StorageRabbitMQ::initRabbitMQ()
|
||||
return;
|
||||
}
|
||||
|
||||
AMQP::TcpChannel rabbit_channel(connection.get());
|
||||
auto rabbit_channel = connection->createChannel();
|
||||
|
||||
/// Main exchange -> Bridge exchange -> ( Sharding exchange ) -> Queues -> Consumers
|
||||
|
||||
initExchange(rabbit_channel);
|
||||
bindExchange(rabbit_channel);
|
||||
initExchange(*rabbit_channel);
|
||||
bindExchange(*rabbit_channel);
|
||||
|
||||
for (const auto i : collections::range(0, num_queues))
|
||||
bindQueue(i + 1, rabbit_channel);
|
||||
bindQueue(i + 1, *rabbit_channel);
|
||||
|
||||
LOG_TRACE(log, "RabbitMQ setup completed");
|
||||
rabbit_is_ready = true;
|
||||
rabbit_channel.close();
|
||||
rabbit_channel->close();
|
||||
}
|
||||
|
||||
|
||||
@ -380,7 +384,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
}
|
||||
|
||||
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers)
|
||||
.onSuccess([&]() { event_handler->stopLoop(); })
|
||||
.onSuccess([&]() { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
@ -392,7 +396,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash)
|
||||
{
|
||||
rabbit_channel.bindExchange(exchange_name, bridge_exchange, routing_keys[0])
|
||||
.onSuccess([&]() { event_handler->stopLoop(); })
|
||||
.onSuccess([&]() { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
@ -410,7 +414,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
{
|
||||
++bound_keys;
|
||||
if (bound_keys == routing_keys.size())
|
||||
event_handler->stopLoop();
|
||||
connection->getHandler().stopLoop();
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
@ -422,7 +426,7 @@ void StorageRabbitMQ::bindExchange(AMQP::TcpChannel & rabbit_channel)
|
||||
}
|
||||
}
|
||||
|
||||
event_handler->startBlockingLoop();
|
||||
connection->getHandler().startBlockingLoop();
|
||||
}
|
||||
|
||||
|
||||
@ -441,7 +445,7 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann
|
||||
* fanout exchange it can be arbitrary
|
||||
*/
|
||||
rabbit_channel.bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
|
||||
.onSuccess([&] { event_handler->stopLoop(); })
|
||||
.onSuccess([&] { connection->getHandler().stopLoop(); })
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
throw Exception(
|
||||
@ -507,52 +511,15 @@ void StorageRabbitMQ::bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_chann
|
||||
/// AMQP::autodelete setting is not allowed, because in case of server restart there will be no consumers
|
||||
/// and deleting queues should not take place.
|
||||
rabbit_channel.declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
|
||||
event_handler->startBlockingLoop();
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
|
||||
{
|
||||
size_t cnt_retries = 0;
|
||||
|
||||
if (reconnecting)
|
||||
{
|
||||
connection->close(); /// Connection might be unusable, but not closed
|
||||
|
||||
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
|
||||
* an AMQP closing-handshake is performed). But cannot open a new connection until previous one is properly closed
|
||||
*/
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
event_handler->iterateLoop();
|
||||
|
||||
/// This will force immediate closure if not yet closed
|
||||
if (!connection->closed())
|
||||
connection->close(true);
|
||||
|
||||
LOG_TRACE(log, "Trying to restore connection to " + address);
|
||||
}
|
||||
|
||||
auto amqp_address = connection_string.empty() ? AMQP::Address(parsed_address.first, parsed_address.second,
|
||||
AMQP::Login(login_password.first, login_password.second), vhost, secure)
|
||||
: AMQP::Address(connection_string);
|
||||
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), amqp_address);
|
||||
|
||||
cnt_retries = 0;
|
||||
while (!connection->ready() && !stream_cancelled && cnt_retries++ != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
|
||||
return event_handler->connectionRunning(connection.get());
|
||||
connection->getHandler().startBlockingLoop();
|
||||
}
|
||||
|
||||
|
||||
bool StorageRabbitMQ::updateChannel(ChannelPtr & channel)
|
||||
{
|
||||
if (event_handler->connectionRunning(connection.get()))
|
||||
if (connection->isConnected())
|
||||
{
|
||||
channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
channel = connection->createChannel();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -576,11 +543,11 @@ void StorageRabbitMQ::unbindExchange()
|
||||
std::call_once(flag, [&]()
|
||||
{
|
||||
streaming_task->deactivate();
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
connection->getHandler().updateLoopState(Loop::STOP);
|
||||
looping_task->deactivate();
|
||||
|
||||
AMQP::TcpChannel rabbit_channel(connection.get());
|
||||
rabbit_channel.removeExchange(bridge_exchange)
|
||||
auto rabbit_channel = connection->createChannel();
|
||||
rabbit_channel->removeExchange(bridge_exchange)
|
||||
.onSuccess([&]()
|
||||
{
|
||||
exchange_removed.store(true);
|
||||
@ -592,19 +559,13 @@ void StorageRabbitMQ::unbindExchange()
|
||||
|
||||
while (!exchange_removed.load())
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
connection->getHandler().iterateLoop();
|
||||
}
|
||||
rabbit_channel.close();
|
||||
rabbit_channel->close();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
String StorageRabbitMQ::formatConnectionInfoForLogs() const
|
||||
{
|
||||
return parsed_address.first + ':' + toString(parsed_address.second);
|
||||
}
|
||||
|
||||
|
||||
Pipe StorageRabbitMQ::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
@ -624,12 +585,12 @@ Pipe StorageRabbitMQ::read(
|
||||
auto modified_context = addSettings(local_context);
|
||||
auto block_size = getMaxBlockSize();
|
||||
|
||||
if (!event_handler->connectionRunning(connection.get()))
|
||||
if (!connection->isConnected())
|
||||
{
|
||||
if (event_handler->loopRunning())
|
||||
if (connection->getHandler().loopRunning())
|
||||
deactivateTask(looping_task, false, true);
|
||||
if (!restoreConnection(true))
|
||||
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", formatConnectionInfoForLogs());
|
||||
if (!connection->reconnect())
|
||||
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "No connection to {}", connection->connectionInfoForLog());
|
||||
}
|
||||
|
||||
Pipes pipes;
|
||||
@ -645,7 +606,7 @@ Pipe StorageRabbitMQ::read(
|
||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(converting_stream));
|
||||
}
|
||||
|
||||
if (!event_handler->loopRunning() && event_handler->connectionRunning(connection.get()))
|
||||
if (!connection->getHandler().loopRunning() && connection->isConnected())
|
||||
looping_task->activateAndSchedule();
|
||||
|
||||
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
|
||||
@ -663,7 +624,7 @@ SinkToStoragePtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr
|
||||
|
||||
void StorageRabbitMQ::startup()
|
||||
{
|
||||
if (event_handler->connectionRunning(connection.get()))
|
||||
if (connection->isConnected())
|
||||
initRabbitMQ();
|
||||
else
|
||||
connection_task->activateAndSchedule();
|
||||
@ -682,7 +643,7 @@ void StorageRabbitMQ::startup()
|
||||
}
|
||||
}
|
||||
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
connection->getHandler().updateLoopState(Loop::RUN);
|
||||
streaming_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
@ -713,13 +674,7 @@ void StorageRabbitMQ::shutdown()
|
||||
|
||||
/// It is important to close connection here - before removing consumer buffers, because
|
||||
/// it will finish and clean callbacks, which might use those buffers data.
|
||||
connection->close();
|
||||
|
||||
/// Connection is not closed immediately - it requires the loop to shutdown it properly and to
|
||||
/// finish all callbacks.
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
event_handler->iterateLoop();
|
||||
connection->disconnect();
|
||||
|
||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
||||
popReadBuffer();
|
||||
@ -739,7 +694,7 @@ void StorageRabbitMQ::cleanupRabbitMQ() const
|
||||
return;
|
||||
|
||||
connection->heartbeat();
|
||||
if (!event_handler->connectionRunning(connection.get()))
|
||||
if (!connection->isConnected())
|
||||
{
|
||||
String queue_names;
|
||||
for (const auto & queue : queues)
|
||||
@ -755,27 +710,27 @@ void StorageRabbitMQ::cleanupRabbitMQ() const
|
||||
return;
|
||||
}
|
||||
|
||||
AMQP::TcpChannel rabbit_channel(connection.get());
|
||||
auto rabbit_channel = connection->createChannel();
|
||||
for (const auto & queue : queues)
|
||||
{
|
||||
/// AMQP::ifunused is needed, because it is possible to share queues between multiple tables and dropping
|
||||
/// on of them should not affect others.
|
||||
/// AMQP::ifempty is not used on purpose.
|
||||
|
||||
rabbit_channel.removeQueue(queue, AMQP::ifunused)
|
||||
rabbit_channel->removeQueue(queue, AMQP::ifunused)
|
||||
.onSuccess([&](uint32_t num_messages)
|
||||
{
|
||||
LOG_TRACE(log, "Successfully deleted queue {}, messages contained {}", queue, num_messages);
|
||||
event_handler->stopLoop();
|
||||
connection->getHandler().stopLoop();
|
||||
})
|
||||
.onError([&](const char * message)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to delete queue {}. Error message: {}", queue, message);
|
||||
event_handler->stopLoop();
|
||||
connection->getHandler().stopLoop();
|
||||
});
|
||||
}
|
||||
event_handler->startBlockingLoop();
|
||||
rabbit_channel.close();
|
||||
connection->getHandler().startBlockingLoop();
|
||||
rabbit_channel->close();
|
||||
|
||||
/// Also there is no need to cleanup exchanges as they were created with AMQP::autodelete option. Once queues
|
||||
/// are removed, exchanges will also be cleaned.
|
||||
@ -819,11 +774,11 @@ ConsumerBufferPtr StorageRabbitMQ::popReadBuffer(std::chrono::milliseconds timeo
|
||||
ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
|
||||
{
|
||||
ChannelPtr consumer_channel;
|
||||
if (event_handler->connectionRunning(connection.get()))
|
||||
consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
|
||||
if (connection->isConnected())
|
||||
consumer_channel = connection->createChannel();
|
||||
|
||||
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
|
||||
consumer_channel, event_handler, queues, ++consumer_id,
|
||||
std::move(consumer_channel), connection->getHandler(), queues, ++consumer_id,
|
||||
unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
|
||||
}
|
||||
|
||||
@ -831,7 +786,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
|
||||
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
|
||||
{
|
||||
return std::make_shared<WriteBufferToRabbitMQProducer>(
|
||||
parsed_address, getContext(), login_password, vhost, routing_keys, exchange_name, exchange_type,
|
||||
configuration, getContext(), routing_keys, exchange_name, exchange_type,
|
||||
producer_id.fetch_add(1), persistent, wait_confirm, log,
|
||||
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
|
||||
}
|
||||
@ -867,7 +822,7 @@ bool StorageRabbitMQ::checkDependencies(const StorageID & table_id)
|
||||
|
||||
void StorageRabbitMQ::streamingToViewsFunc()
|
||||
{
|
||||
if (rabbit_is_ready && (event_handler->connectionRunning(connection.get()) || restoreConnection(true)))
|
||||
if ((rabbit_is_ready && connection->isConnected()) || connection->reconnect())
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -893,7 +848,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
|
||||
/// Reschedule with backoff.
|
||||
if (milliseconds_to_wait < BACKOFF_TRESHOLD)
|
||||
milliseconds_to_wait *= 2;
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
connection->getHandler().updateLoopState(Loop::STOP);
|
||||
break;
|
||||
}
|
||||
else
|
||||
@ -905,7 +860,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
|
||||
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
|
||||
{
|
||||
event_handler->updateLoopState(Loop::STOP);
|
||||
connection->getHandler().updateLoopState(Loop::STOP);
|
||||
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
|
||||
break;
|
||||
}
|
||||
@ -975,9 +930,9 @@ bool StorageRabbitMQ::streamToViews()
|
||||
|
||||
std::atomic<bool> stub = {false};
|
||||
|
||||
if (!event_handler->loopRunning())
|
||||
if (!connection->getHandler().loopRunning())
|
||||
{
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
connection->getHandler().updateLoopState(Loop::RUN);
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
@ -989,12 +944,12 @@ bool StorageRabbitMQ::streamToViews()
|
||||
deactivateTask(looping_task, false, true);
|
||||
size_t queue_empty = 0;
|
||||
|
||||
if (!event_handler->connectionRunning(connection.get()))
|
||||
if (!connection->isConnected())
|
||||
{
|
||||
if (stream_cancelled)
|
||||
return true;
|
||||
|
||||
if (restoreConnection(true))
|
||||
if (connection->reconnect())
|
||||
{
|
||||
for (auto & stream : streams)
|
||||
stream->as<RabbitMQBlockInputStream>()->updateChannel();
|
||||
@ -1043,12 +998,12 @@ bool StorageRabbitMQ::streamToViews()
|
||||
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
|
||||
{
|
||||
/// Iterate loop to activate error callbacks if they happened
|
||||
event_handler->iterateLoop();
|
||||
if (!event_handler->connectionRunning(connection.get()))
|
||||
connection->getHandler().iterateLoop();
|
||||
if (!connection->isConnected())
|
||||
break;
|
||||
}
|
||||
|
||||
event_handler->iterateLoop();
|
||||
connection->getHandler().iterateLoop();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1061,7 +1016,7 @@ bool StorageRabbitMQ::streamToViews()
|
||||
}
|
||||
else
|
||||
{
|
||||
event_handler->updateLoopState(Loop::RUN);
|
||||
connection->getHandler().updateLoopState(Loop::RUN);
|
||||
looping_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
|
@ -7,9 +7,8 @@
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <Storages/RabbitMQ/Buffer_fwd.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQSettings.h>
|
||||
#include <Storages/RabbitMQ/UVLoop.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQConnection.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <amqpcpp/libuv.h>
|
||||
#include <uv.h>
|
||||
@ -19,8 +18,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
|
||||
|
||||
class StorageRabbitMQ final: public shared_ptr_helper<StorageRabbitMQ>, public IStorage, WithContext
|
||||
{
|
||||
friend struct shared_ptr_helper<StorageRabbitMQ>;
|
||||
@ -103,16 +100,9 @@ private:
|
||||
|
||||
bool hash_exchange;
|
||||
Poco::Logger * log;
|
||||
String address;
|
||||
std::pair<String, UInt16> parsed_address;
|
||||
std::pair<String, String> login_password;
|
||||
String vhost;
|
||||
String connection_string;
|
||||
bool secure;
|
||||
|
||||
UVLoop loop;
|
||||
std::shared_ptr<RabbitMQHandler> event_handler;
|
||||
std::unique_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
|
||||
RabbitMQConnectionPtr connection; /// Connection for all consumers
|
||||
RabbitMQConfiguration configuration;
|
||||
|
||||
size_t num_created_consumers = 0;
|
||||
Poco::Semaphore semaphore;
|
||||
@ -154,7 +144,6 @@ private:
|
||||
static Names parseSettings(String settings_list);
|
||||
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
|
||||
static String getTableBasedName(String name, const StorageID & table_id);
|
||||
String formatConnectionInfoForLogs() const;
|
||||
|
||||
std::shared_ptr<Context> addSettings(ContextPtr context) const;
|
||||
size_t getMaxBlockSize() const;
|
||||
@ -167,7 +156,6 @@ private:
|
||||
void bindExchange(AMQP::TcpChannel & rabbit_channel);
|
||||
void bindQueue(size_t queue_id, AMQP::TcpChannel & rabbit_channel);
|
||||
|
||||
bool restoreConnection(bool reconnecting);
|
||||
bool streamToViews();
|
||||
bool checkDependencies(const StorageID & table_id);
|
||||
|
||||
|
@ -26,10 +26,8 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address_,
|
||||
const RabbitMQConfiguration & configuration_,
|
||||
ContextPtr global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const String & vhost_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
const AMQP::ExchangeType exchange_type_,
|
||||
@ -41,9 +39,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
size_t rows_per_message,
|
||||
size_t chunk_size_)
|
||||
: WriteBuffer(nullptr, 0)
|
||||
, parsed_address(parsed_address_)
|
||||
, login_password(login_password_)
|
||||
, vhost(vhost_)
|
||||
, connection(configuration_, log_)
|
||||
, routing_keys(routing_keys_)
|
||||
, exchange_name(exchange_name_)
|
||||
, exchange_type(exchange_type_)
|
||||
@ -57,20 +53,10 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
, max_rows(rows_per_message)
|
||||
, chunk_size(chunk_size_)
|
||||
{
|
||||
event_handler = std::make_unique<RabbitMQHandler>(loop.getLoop(), log);
|
||||
|
||||
if (setupConnection(false))
|
||||
{
|
||||
if (connection.connect())
|
||||
setupChannel();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!connection->closed())
|
||||
connection->close(true);
|
||||
|
||||
throw Exception("Cannot connect to RabbitMQ host: " + parsed_address.first + ", port: " + std::to_string(parsed_address.second),
|
||||
ErrorCodes::CANNOT_CONNECT_RABBITMQ);
|
||||
}
|
||||
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to RabbitMQ {}", connection.connectionInfoForLog());
|
||||
|
||||
writing_task = global_context->getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
|
||||
writing_task->deactivate();
|
||||
@ -92,12 +78,12 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
|
||||
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
|
||||
{
|
||||
writing_task->deactivate();
|
||||
connection->close();
|
||||
connection.disconnect();
|
||||
|
||||
size_t cnt_retries = 0;
|
||||
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
|
||||
while (!connection.closed() && cnt_retries++ != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
connection.getHandler().iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
|
||||
@ -131,42 +117,9 @@ void WriteBufferToRabbitMQProducer::countRow()
|
||||
}
|
||||
|
||||
|
||||
bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting)
|
||||
{
|
||||
size_t cnt_retries = 0;
|
||||
|
||||
if (reconnecting)
|
||||
{
|
||||
connection->close();
|
||||
|
||||
while (!connection->closed() && ++cnt_retries != RETRIES_MAX)
|
||||
event_handler->iterateLoop();
|
||||
|
||||
if (!connection->closed())
|
||||
connection->close(true);
|
||||
|
||||
LOG_TRACE(log, "Trying to set up connection");
|
||||
}
|
||||
|
||||
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
|
||||
AMQP::Address(
|
||||
parsed_address.first, parsed_address.second,
|
||||
AMQP::Login(login_password.first, login_password.second), vhost));
|
||||
|
||||
cnt_retries = 0;
|
||||
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
|
||||
}
|
||||
|
||||
return event_handler->connectionRunning(connection.get());
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferToRabbitMQProducer::setupChannel()
|
||||
{
|
||||
producer_channel = std::make_unique<AMQP::TcpChannel>(connection.get());
|
||||
producer_channel = connection.createChannel();
|
||||
|
||||
producer_channel->onError([&](const char * message)
|
||||
{
|
||||
@ -322,7 +275,7 @@ void WriteBufferToRabbitMQProducer::writingFunc()
|
||||
|
||||
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
|
||||
wait_all = false;
|
||||
else if ((!producer_channel->usable() && event_handler->connectionRunning(connection.get())) || (!event_handler->connectionRunning(connection.get()) && setupConnection(true)))
|
||||
else if ((!producer_channel->usable() && connection.isConnected()) || (!connection.isConnected() && connection.reconnect()))
|
||||
setupChannel();
|
||||
}
|
||||
|
||||
@ -355,7 +308,7 @@ void WriteBufferToRabbitMQProducer::reinitializeChunks()
|
||||
|
||||
void WriteBufferToRabbitMQProducer::iterateEventLoop()
|
||||
{
|
||||
event_handler->iterateLoop();
|
||||
connection.getHandler().iterateLoop();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,8 +6,7 @@
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <amqpcpp.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQHandler.h>
|
||||
#include <Storages/RabbitMQ/UVLoop.h>
|
||||
#include <Storages/RabbitMQ/RabbitMQConnection.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Core/Names.h>
|
||||
@ -19,14 +18,12 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer
|
||||
{
|
||||
public:
|
||||
WriteBufferToRabbitMQProducer(
|
||||
std::pair<String, UInt16> & parsed_address_,
|
||||
const RabbitMQConfiguration & configuratin_,
|
||||
ContextPtr global_context,
|
||||
const std::pair<String, String> & login_password_,
|
||||
const String & vhost_,
|
||||
const Names & routing_keys_,
|
||||
const String & exchange_name_,
|
||||
const AMQP::ExchangeType exchange_type_,
|
||||
const size_t channel_id_,
|
||||
const size_t channel_id_base_,
|
||||
const bool persistent_,
|
||||
std::atomic<bool> & wait_confirm_,
|
||||
Poco::Logger * log_,
|
||||
@ -48,14 +45,12 @@ private:
|
||||
|
||||
void iterateEventLoop();
|
||||
void writingFunc();
|
||||
bool setupConnection(bool reconnecting);
|
||||
void setupChannel();
|
||||
void removeRecord(UInt64 received_delivery_tag, bool multiple, bool republish);
|
||||
void publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & message, bool republishing);
|
||||
|
||||
std::pair<String, UInt16> parsed_address;
|
||||
const std::pair<String, String> login_password;
|
||||
const String vhost;
|
||||
RabbitMQConnection connection;
|
||||
|
||||
const Names routing_keys;
|
||||
const String exchange_name;
|
||||
AMQP::ExchangeType exchange_type;
|
||||
@ -70,9 +65,6 @@ private:
|
||||
AMQP::Table key_arguments;
|
||||
BackgroundSchedulePool::TaskHolder writing_task;
|
||||
|
||||
UVLoop loop;
|
||||
std::unique_ptr<RabbitMQHandler> event_handler;
|
||||
std::unique_ptr<AMQP::TcpConnection> connection;
|
||||
std::unique_ptr<AMQP::TcpChannel> producer_channel;
|
||||
bool producer_ready = false;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user