Trying to do everything on top of libuv, add heartbeats

This commit is contained in:
alesapin 2020-06-25 00:14:49 +03:00
parent cb30dbfe28
commit addee61bcb
9 changed files with 86 additions and 55 deletions

View File

@ -11,9 +11,9 @@ enum
}; };
RabbitMQHandler::RabbitMQHandler(event_base * evbase_, Poco::Logger * log_) : RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
LibEventHandler(evbase_), AMQP::LibUvHandler(loop_),
evbase(evbase_), loop(loop_),
log(log_) log(log_)
{ {
tv.tv_sec = 0; tv.tv_sec = 0;
@ -44,7 +44,7 @@ void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
loop_started.store(true); loop_started.store(true);
stop_scheduled = false; stop_scheduled = false;
event_base_loop(evbase, EVLOOP_NONBLOCK); uv_run(loop, UV_RUN_NOWAIT);
mutex_before_event_loop.unlock(); mutex_before_event_loop.unlock();
} }
} }
@ -52,7 +52,7 @@ void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
void RabbitMQHandler::startProducerLoop() void RabbitMQHandler::startProducerLoop()
{ {
event_base_loop(evbase, EVLOOP_NONBLOCK); uv_run(loop, UV_RUN_NOWAIT);
} }
@ -60,7 +60,7 @@ void RabbitMQHandler::stop()
{ {
if (mutex_before_loop_stop.try_lock()) if (mutex_before_loop_stop.try_lock())
{ {
event_base_loopbreak(evbase); uv_stop(loop);
mutex_before_loop_stop.unlock(); mutex_before_loop_stop.unlock();
} }
} }
@ -69,7 +69,7 @@ void RabbitMQHandler::stop()
void RabbitMQHandler::stopWithTimeout() void RabbitMQHandler::stopWithTimeout()
{ {
stop_scheduled = true; stop_scheduled = true;
event_base_loopexit(evbase, &tv); uv_stop(loop);
} }
} }

View File

@ -7,16 +7,16 @@
#include <amqpcpp/libevent.h> #include <amqpcpp/libevent.h>
#include <amqpcpp/linux_tcp.h> #include <amqpcpp/linux_tcp.h>
#include <common/types.h> #include <common/types.h>
#include <event2/event.h> #include <amqpcpp/libuv.h>
namespace DB namespace DB
{ {
class RabbitMQHandler : public AMQP::LibEventHandler class RabbitMQHandler : public AMQP::LibUvHandler
{ {
public: public:
RabbitMQHandler(event_base * evbase_, Poco::Logger * log_); RabbitMQHandler(uv_loop_t * evbase_, Poco::Logger * log_);
void onError(AMQP::TcpConnection * connection, const char * message) override; void onError(AMQP::TcpConnection * connection, const char * message) override;
void startConsumerLoop(std::atomic<bool> & loop_started); void startConsumerLoop(std::atomic<bool> & loop_started);
@ -26,7 +26,7 @@ public:
std::atomic<bool> & checkStopIsScheduled() { return stop_scheduled; }; std::atomic<bool> & checkStopIsScheduled() { return stop_scheduled; };
private: private:
event_base * evbase; uv_loop_t * loop;
Poco::Logger * log; Poco::Logger * log;
timeval tv; timeval tv;

View File

@ -446,10 +446,7 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
} }
if (received.empty()) if (received.empty())
{
LOG_TRACE(log, "No more messages to be fetched");
return false; return false;
}
messages.clear(); messages.clear();

View File

@ -43,6 +43,8 @@ enum
Connection_setup_retries_max = 1000 Connection_setup_retries_max = 1000
}; };
static const auto RESCHEDULE_MS = 500;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
@ -80,19 +82,21 @@ StorageRabbitMQ::StorageRabbitMQ(
rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"), rabbitmq_context.getConfigRef().getString("rabbitmq_username", "root"),
rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse"))) rabbitmq_context.getConfigRef().getString("rabbitmq_password", "clickhouse")))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, evbase(event_base_new())
, eventHandler(evbase, log)
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), "/"))
{ {
loop = new uv_loop_t;
uv_loop_init(loop);
event_handler = std::make_unique<RabbitMQHandler>(loop, log);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
size_t cnt_retries = 0; size_t cnt_retries = 0;
while (!connection.ready() && ++cnt_retries != Connection_setup_retries_max) while (!connection->ready() && ++cnt_retries != Connection_setup_retries_max)
{ {
event_base_loop(evbase, EVLOOP_NONBLOCK | EVLOOP_ONCE); uv_run(loop, UV_RUN_NOWAIT);
std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep)); std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
} }
if (!connection.ready()) if (!connection->ready())
{ {
LOG_ERROR(log, "Cannot set up connection for consumer"); LOG_ERROR(log, "Cannot set up connection for consumer");
} }
@ -102,8 +106,10 @@ StorageRabbitMQ::StorageRabbitMQ(
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ threadFunc(); });
task->deactivate(); streaming_task->deactivate();
heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); });
heartbeat_task->deactivate();
bind_by_id = num_consumers > 1 || num_queues > 1; bind_by_id = num_consumers > 1 || num_queues > 1;
@ -115,6 +121,17 @@ StorageRabbitMQ::StorageRabbitMQ(
} }
void StorageRabbitMQ::heartbeatFunc()
{
if (!stream_cancelled)
{
LOG_DEBUG(log, "Sending RabbitMQ heartbeat");
connection->heartbeat();
heartbeat_task->scheduleAfter(RESCHEDULE_MS * 10);
}
}
Pipes StorageRabbitMQ::read( Pipes StorageRabbitMQ::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
@ -165,7 +182,8 @@ void StorageRabbitMQ::startup()
} }
} }
task->activateAndSchedule(); streaming_task->activateAndSchedule();
heartbeat_task->activateAndSchedule();
} }
@ -178,8 +196,10 @@ void StorageRabbitMQ::shutdown()
popReadBuffer(); popReadBuffer();
} }
connection.close(); streaming_task->deactivate();
task->deactivate(); heartbeat_task->deactivate();
connection->close();
} }
@ -223,16 +243,19 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
next_channel_id += num_queues; next_channel_id += num_queues;
update_channel_id = true; update_channel_id = true;
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(&connection); ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(consumer_channel, eventHandler, exchange_name, routing_keys, return std::make_shared<ReadBufferFromRabbitMQConsumer>(
next_channel_id, log, row_delimiter, bind_by_id, num_queues, exchange_type, local_exchange_name, stream_cancelled); consumer_channel, *event_handler, exchange_name, routing_keys,
next_channel_id, log, row_delimiter, bind_by_id, num_queues,
exchange_type, local_exchange_name, stream_cancelled);
} }
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{ {
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, login_password, routing_keys[0], local_exchange_name, return std::make_shared<WriteBufferToRabbitMQProducer>(
parsed_address, login_password, routing_keys[0], local_exchange_name,
log, num_consumers * num_queues, bind_by_id, use_transactional_channel, log, num_consumers * num_queues, bind_by_id, use_transactional_channel,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024); row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
} }
@ -296,7 +319,7 @@ void StorageRabbitMQ::threadFunc()
/// Wait for attached views /// Wait for attached views
if (!stream_cancelled) if (!stream_cancelled)
task->activateAndSchedule(); streaming_task->schedule();
} }

View File

@ -9,7 +9,8 @@
#include <atomic> #include <atomic>
#include <Storages/RabbitMQ/Buffer_fwd.h> #include <Storages/RabbitMQ/Buffer_fwd.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h> #include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <event2/event.h> #include <amqpcpp/libuv.h>
#include <uv.h>
namespace DB namespace DB
@ -53,7 +54,6 @@ public:
const String & getFormatName() const { return format_name; } const String & getFormatName() const { return format_name; }
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;
const void pingConnection() { connection.heartbeat(); }
protected: protected:
StorageRabbitMQ( StorageRabbitMQ(
@ -91,9 +91,9 @@ private:
std::pair<String, UInt16> parsed_address; std::pair<String, UInt16> parsed_address;
std::pair<String, String> login_password; std::pair<String, String> login_password;
event_base * evbase; uv_loop_t * loop;
RabbitMQHandler eventHandler; std::unique_ptr<RabbitMQHandler> event_handler;
AMQP::TcpConnection connection; /// Connection for all consumers std::unique_ptr<AMQP::TcpConnection> connection; /// Connection for all consumers
Poco::Semaphore semaphore; Poco::Semaphore semaphore;
std::mutex mutex; std::mutex mutex;
@ -102,12 +102,16 @@ private:
size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0 size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0
bool update_channel_id = false; bool update_channel_id = false;
BackgroundSchedulePool::TaskHolder task; BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
std::atomic<bool> stream_cancelled{false}; std::atomic<bool> stream_cancelled{false};
ConsumerBufferPtr createReadBuffer(); ConsumerBufferPtr createReadBuffer();
void threadFunc(); void threadFunc();
void heartbeatFunc();
void pingConnection() { connection->heartbeat(); }
bool streamToViews(); bool streamToViews();
bool checkDependencies(const StorageID & table_id); bool checkDependencies(const StorageID & table_id);
}; };

View File

@ -4,6 +4,7 @@
#include "Columns/ColumnsNumber.h" #include "Columns/ColumnsNumber.h"
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <amqpcpp.h> #include <amqpcpp.h>
#include <uv.h>
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
@ -43,28 +44,31 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, delim(delimiter) , delim(delimiter)
, max_rows(rows_per_message) , max_rows(rows_per_message)
, chunk_size(chunk_size_) , chunk_size(chunk_size_)
, producerEvbase(event_base_new())
, eventHandler(producerEvbase, log)
, connection(&eventHandler, AMQP::Address(parsed_address.first, parsed_address.second,
AMQP::Login(login_password.first, login_password.second), "/"))
{ {
loop = new uv_loop_t;
uv_loop_init(loop);
event_handler = std::make_unique<RabbitMQHandler>(loop, log);
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
/* The reason behind making a separate connection for each concurrent producer is explained here: /* The reason behind making a separate connection for each concurrent producer is explained here:
* https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from * https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086 - publishing from
* different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors. * different threads (as outputStreams are asynchronous) with the same connection leads to internal library errors.
*/ */
size_t cnt_retries = 0; size_t cnt_retries = 0;
while (!connection.ready() && ++cnt_retries != Loop_retries_max) while (!connection->ready() && ++cnt_retries != Loop_retries_max)
{ {
event_base_loop(producerEvbase, EVLOOP_NONBLOCK | EVLOOP_ONCE); uv_run(loop, UV_RUN_NOWAIT);
std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep)); std::this_thread::sleep_for(std::chrono::milliseconds(Connection_setup_sleep));
} }
if (!connection.ready()) if (!connection->ready())
{ {
LOG_ERROR(log, "Cannot set up connection for producer!"); LOG_ERROR(log, "Cannot set up connection for producer!");
} }
producer_channel = std::make_shared<AMQP::TcpChannel>(&connection); producer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
checkExchange(); checkExchange();
/// If publishing should be wrapped in transactions /// If publishing should be wrapped in transactions
@ -78,7 +82,9 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{ {
finilizeProducer(); finilizeProducer();
connection.close(); connection->close();
event_handler->stop();
assert(rows == 0 && chunks.empty()); assert(rows == 0 && chunks.empty());
} }
@ -195,7 +201,7 @@ void WriteBufferToRabbitMQProducer::nextImpl()
void WriteBufferToRabbitMQProducer::startEventLoop() void WriteBufferToRabbitMQProducer::startEventLoop()
{ {
eventHandler.startProducerLoop(); event_handler->startProducerLoop();
} }
} }

View File

@ -49,9 +49,9 @@ private:
const size_t num_queues; const size_t num_queues;
const bool use_transactional_channel; const bool use_transactional_channel;
event_base * producerEvbase; uv_loop_t * loop;
RabbitMQHandler eventHandler; std::unique_ptr<RabbitMQHandler> event_handler;
AMQP::TcpConnection connection; std::unique_ptr<AMQP::TcpConnection> connection;
ProducerPtr producer_channel; ProducerPtr producer_channel;
size_t next_queue = 0; size_t next_queue = 0;

View File

@ -551,6 +551,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
while True: while True:
result = instance.query('SELECT count() FROM test.view') result = instance.query('SELECT count() FROM test.view')
time.sleep(1) time.sleep(1)
print("Result", result, "Expected", messages_num * threads_num)
if int(result) == messages_num * threads_num: if int(result) == messages_num * threads_num:
break break
@ -641,7 +642,8 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
DROP TABLE IF EXISTS test.consumer; DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64) CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree ENGINE = MergeTree
ORDER BY key; ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq; SELECT * FROM test.rabbitmq;
''') ''')
@ -1522,4 +1524,3 @@ if __name__ == '__main__':
cluster.start() cluster.start()
raw_input("Cluster created, press any key to destroy...") raw_input("Cluster created, press any key to destroy...")
cluster.shutdown() cluster.shutdown()