Move reading from RabbitMQ into background task

This commit is contained in:
kssenii 2020-06-27 17:26:00 +00:00
parent 36eb2c3028
commit 649eb8e348
7 changed files with 75 additions and 78 deletions

View File

@ -496,6 +496,7 @@ namespace ErrorCodes
extern const int NO_SUITABLE_FUNCTION_IMPLEMENTATION = 527;
extern const int CASSANDRA_INTERNAL_ERROR = 528;
extern const int NOT_A_LEADER = 529;
extern const int CANNOT_CONNECT_RABBITMQ = 530;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -4,20 +4,16 @@
namespace DB
{
enum
{
Lock_timeout = 50,
Loop_stop_timeout = 200
};
static const auto Lock_timeout = 50;
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler).
*/
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
AMQP::LibUvHandler(loop_),
loop(loop_),
log(log_)
{
tv.tv_sec = 0;
tv.tv_usec = Loop_stop_timeout;
}
@ -34,15 +30,18 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
}
void RabbitMQHandler::startLoop()
{
while (!stop_loop)
{
uv_run(loop, UV_RUN_NOWAIT);
}
}
void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
{
/* The object of this class is shared between concurrent consumers (who share the same connection == share the same
* event loop and handler). But the loop should not be attempted to start if it is already running.
*/
std::lock_guard lock(mutex_before_event_loop);
loop_started.store(true);
stop_scheduled = false;
uv_run(loop, UV_RUN_NOWAIT);
}
@ -55,15 +54,9 @@ void RabbitMQHandler::startProducerLoop()
void RabbitMQHandler::stop()
{
std::lock_guard lock(mutex_before_loop_stop);
uv_stop(loop);
}
void RabbitMQHandler::stopWithTimeout()
{
stop_scheduled = true;
uv_stop(loop);
//std::lock_guard lock(mutex_before_loop_stop);
//uv_stop(loop);
stop_loop = true;
}
}

View File

@ -21,16 +21,16 @@ public:
void onError(AMQP::TcpConnection * connection, const char * message) override;
void startConsumerLoop(std::atomic<bool> & loop_started);
void startProducerLoop();
void stopWithTimeout();
void stop();
std::atomic<bool> & checkStopIsScheduled() { return stop_scheduled; };
void startLoop();
private:
uv_loop_t * loop;
Poco::Logger * log;
timeval tv;
std::atomic<bool> stop_scheduled = false;
std::atomic<bool> stop_loop = false, running_loop = false;
std::timed_mutex starting_loop;
std::mutex mutex_before_event_loop;
std::mutex mutex_before_loop_stop;
};

View File

@ -33,7 +33,7 @@ namespace ExchangeType
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & eventHandler_,
HandlerPtr eventHandler_,
const String & exchange_name_,
const Names & routing_keys_,
const size_t channel_id_,
@ -117,8 +117,6 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
return;
}
/// For special purposes to use the flexibility of routing provided by rabbitmq - choosing exchange types is supported.
AMQP::ExchangeType type;
if (exchange_type == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
@ -247,7 +245,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
});
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onReady callback or any other (and the looping), otherwise
* if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise
* consumer might fail to subscribe and no resubscription will help.
*/
subscribe(queues.back());
@ -280,7 +278,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
AMQP::Table binding_arguments;
std::vector<String> matching;
/// It is not parsed for the second time - if it was parsed above, then it would go to the first if statement, not here.
/// It is not parsed for the second time - if it was parsed above, then it would never end up here.
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
@ -367,15 +365,6 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
std::lock_guard lock(mutex);
received.push_back(message_received);
}
/* As event loop is blocking to the thread that started it and a single thread should not be blocked while
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
*/
if (!loop_started.load() && !event_handler.checkStopIsScheduled())
{
stopEventLoopWithTimeout();
}
}
})
.onError([&](const char * message)
@ -415,19 +404,13 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
void ReadBufferFromRabbitMQConsumer::stopEventLoop()
{
event_handler.stop();
}
void ReadBufferFromRabbitMQConsumer::stopEventLoopWithTimeout()
{
event_handler.stopWithTimeout();
event_handler->stop();
}
void ReadBufferFromRabbitMQConsumer::startEventLoop(std::atomic<bool> & loop_started)
{
event_handler.startConsumerLoop(loop_started);
event_handler->startConsumerLoop(loop_started);
}
@ -438,21 +421,12 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
if (current == messages.end())
{
if (received.empty())
{
/// Run the onReceived callbacks to save the messages that have been received by now, blocks current thread.
startEventLoop(loop_started);
loop_started.store(false);
}
if (received.empty())
return false;
messages.clear();
/// Needed to avoid data race because this vector can be used at the same time by another thread in onReceived callback.
std::lock_guard lock(mutex);
messages.clear();
messages.swap(received);
current = messages.begin();
}

View File

@ -5,6 +5,7 @@
#include <IO/ReadBuffer.h>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Core/BackgroundSchedulePool.h>
#include <event2/event.h>
namespace Poco
@ -16,6 +17,7 @@ namespace DB
{
using ChannelPtr = std::shared_ptr<AMQP::TcpChannel>;
using HandlerPtr = std::shared_ptr<RabbitMQHandler>;
class ReadBufferFromRabbitMQConsumer : public ReadBuffer
{
@ -23,7 +25,7 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
RabbitMQHandler & event_handler_,
HandlerPtr event_handler_,
const String & exchange_name_,
const Names & routing_keys_,
const size_t channel_id_,
@ -46,7 +48,7 @@ private:
using Messages = std::vector<String>;
ChannelPtr consumer_channel;
RabbitMQHandler & event_handler;
HandlerPtr event_handler;
const String & exchange_name;
const Names & routing_keys;
@ -92,7 +94,6 @@ private:
void initQueueBindings(const size_t queue_id);
void subscribe(const String & queue_name);
void startEventLoop(std::atomic<bool> & loop_started);
void stopEventLoopWithTimeout();
void stopEventLoop();
};

View File

@ -39,16 +39,17 @@ namespace DB
enum
{
Connection_setup_sleep = 200,
Connection_setup_retries_max = 1000
};
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 1000;
static const auto RESCHEDULE_MS = 500;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CONNECT_RABBITMQ;
}
@ -79,27 +80,25 @@ StorageRabbitMQ::StorageRabbitMQ(
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, num_consumers_)
, login_password(std::make_pair(
rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"),
rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse")))
global_context.getConfigRef().getString("rabbitmq_username", "root"),
global_context.getConfigRef().getString("rabbitmq_password", "clickhouse")))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
event_handler = std::make_shared<RabbitMQHandler>(loop.get(), log);
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
size_t cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != Connection_setup_retries_max)
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
uv_run(loop.get(), UV_RUN_NOWAIT);
std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
if (!connection->ready())
{
LOG_ERROR(log, "Cannot set up connection for consumer");
}
throw Exception("Cannot set up connection for consumers", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
rabbitmq_context.makeQueryContext();
StorageInMemoryMetadata storage_metadata;
@ -118,6 +117,10 @@ StorageRabbitMQ::StorageRabbitMQ(
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
local_exchange_name = exchange_name + "_" + table_name;
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task->deactivate();
}
@ -132,6 +135,13 @@ void StorageRabbitMQ::heartbeatFunc()
}
void StorageRabbitMQ::loopingFunc()
{
LOG_DEBUG(log, "Starting event looping iterations");
event_handler->startLoop();
}
Pipes StorageRabbitMQ::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -154,8 +164,13 @@ Pipes StorageRabbitMQ::read(
*this, metadata_snapshot, context, column_names, log)));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
if (!loop_started)
{
loop_started = true;
looping_task->activateAndSchedule();
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return pipes;
}
@ -199,6 +214,9 @@ void StorageRabbitMQ::shutdown()
streaming_task->deactivate();
heartbeat_task->deactivate();
event_handler->stop();
looping_task->deactivate();
connection->close();
}
@ -246,7 +264,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, *event_handler, exchange_name, routing_keys,
consumer_channel, event_handler, exchange_name, routing_keys,
next_channel_id, log, row_delimiter, bind_by_id, num_queues,
exchange_type, local_exchange_name, stream_cancelled);
}
@ -354,6 +372,12 @@ bool StorageRabbitMQ::streamToViews()
stream->setLimits(limits);
}
if (!loop_started)
{
loop_started = true;
looping_task->activateAndSchedule();
}
// Join multiple streams if necessary
BlockInputStreamPtr in;
if (streams.size() > 1)

View File

@ -91,9 +91,9 @@ private:
std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password;
std::unique_ptr<uv_loop_t> loop;
std::unique_ptr<RabbitMQHandler> event_handler;
std::unique_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
std::shared_ptr<uv_loop_t> loop;
std::shared_ptr<RabbitMQHandler> event_handler;
std::shared_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
Poco::Semaphore semaphore;
std::mutex mutex;
@ -101,15 +101,19 @@ private:
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false;
std::atomic<bool> loop_started = false;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
BackgroundSchedulePool::TaskHolder looping_task;
std::atomic<bool> stream_cancelled{false};
ConsumerBufferPtr createReadBuffer();
void threadFunc();
void heartbeatFunc();
void loopingFunc();
void pingConnection() { connection->heartbeat(); }
bool streamToViews();