More tests, better reconnect

This commit is contained in:
kssenii 2020-08-08 16:45:52 +00:00
parent eff0233184
commit 2ea32a710a
7 changed files with 295 additions and 64 deletions

View File

@ -11,7 +11,9 @@ namespace DB
RabbitMQHandler::RabbitMQHandler(uv_loop_t * loop_, Poco::Logger * log_) :
AMQP::LibUvHandler(loop_),
loop(loop_),
log(log_)
log(log_),
connection_running(false),
loop_state(Loop::STOP)
{
}
@ -27,15 +29,16 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
void RabbitMQHandler::onReady(AMQP::TcpConnection * /* connection */)
{
LOG_TRACE(log, "Connection is ready");
connection_running.store(true);
LOG_TRACE(log, "Connection is ready");
loop_state.store(Loop::RUN);
}
void RabbitMQHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
/// stop_loop variable is updated in a separate thread
while (!stop_loop.load() && connection_running.load())
while (loop_state.load() == Loop::RUN)
uv_run(loop, UV_RUN_NOWAIT);
}

View File

@ -11,6 +11,12 @@
namespace DB
{
namespace Loop
{
static const UInt8 RUN = 1;
static const UInt8 STOP = 2;
}
class RabbitMQHandler : public AMQP::LibUvHandler
{
@ -19,16 +25,18 @@ public:
void onError(AMQP::TcpConnection * connection, const char * message) override;
void onReady(AMQP::TcpConnection * connection) override;
void stop() { stop_loop.store(true); }
void startLoop();
void iterateLoop();
bool connectionRunning() { return connection_running.load(); }
void updateLoopState(UInt8 state) { loop_state.store(state); }
UInt8 getLoopState() { return loop_state.load(); }
private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> stop_loop = false, connection_running = false;
std::atomic<bool> connection_running;
std::atomic<UInt8> loop_state;
std::mutex startup_mutex;
};

View File

@ -51,7 +51,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
{
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message);
LOG_ERROR(log, "Consumer {} error: {}", channel_id, message);
channel_error.store(true);
});
@ -129,7 +129,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
{
if (consumer_tag.empty())
consumer_tag = consumer;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}, consumer tag {}", channel_id, queue_name, consumer);
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
})
.onReceived([&](const AMQP::Message & message, uint64_t delivery_tag, bool redelivered)
{
@ -157,7 +157,7 @@ void ReadBufferFromRabbitMQConsumer::ackMessages()
{
prev_tag = delivery_tag;
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag);
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", channel_id, prev_tag);
}
}

View File

@ -111,7 +111,7 @@ StorageRabbitMQ::StorageRabbitMQ(
if (!connection->ready())
{
uv_loop_close(loop.get());
throw Exception("Cannot set up connection for consumers", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
throw Exception("Cannot connect to RabbitMQ", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
rabbitmq_context.makeQueryContext();
@ -161,7 +161,7 @@ StorageRabbitMQ::StorageRabbitMQ(
* at the same time sharding exchange is needed (if there are multiple shared queues), then those tables also need
* to share sharding exchange.
*/
sharding_exchange = exchange_name + queue_base;
sharding_exchange = exchange_name + "_" + queue_base;
}
bridge_exchange = sharding_exchange + "_bridge";
@ -319,7 +319,7 @@ void StorageRabbitMQ::unbindExchange()
event_handler->iterateLoop();
}
event_handler->stop();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
});
@ -335,31 +335,40 @@ bool StorageRabbitMQ::restoreConnection()
if (!connection->usable() || !connection->ready())
{
LOG_TRACE(log, "Trying to restore consumer connection");
if (event_handler->getLoopState() == Loop::RUN)
{
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
heartbeat_task->deactivate();
}
/* connection->close() is called in onError() method (called by the AMQP library when a fatal error occurs on the connection)
* inside event_handler, but it is not closed immediately (firstly, all pending operations are completed, and then an AMQP
* closing-handshake is performed). But cannot open a new connection untill previous one is properly closed).
*/
size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
event_handler->iterateLoop();
/// This will force immediate closure if not yet closed.
if (!connection->closed())
connection->close();
connection->close(true);
LOG_TRACE(log, "Trying to restore consumer connection");
connection = std::make_shared<AMQP::TcpConnection>(event_handler.get(), AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
size_t cnt_retries = 0;
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP));
}
}
if (event_handler->connectionRunning())
{
LOG_TRACE(log, "Connection restored");
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
looping_task->activateAndSchedule();
}
else
{
LOG_TRACE(log, "Connection refused");
if (event_handler->connectionRunning())
{
looping_task->activateAndSchedule();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
}
restore_connection.unlock();
@ -451,8 +460,7 @@ void StorageRabbitMQ::startup()
void StorageRabbitMQ::shutdown()
{
stream_cancelled = true;
event_handler->stop();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
streaming_task->deactivate();

View File

@ -52,10 +52,10 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
/* New coonection for each publisher because cannot publish from different threads with the same connection.
* (See https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086)
/* New coonection for each producer buffer because cannot publish from different threads with the same connection.
* (https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/128#issuecomment-300780086)
*/
if (setupConnection())
if (setupConnection(false))
setupChannel();
writing_task = global_context.getSchedulePool().createTask("RabbitMQWritingTask", [this]{ writingFunc(); });
@ -103,21 +103,41 @@ void WriteBufferToRabbitMQProducer::countRow()
chunks.clear();
set(nullptr, 0);
++payload_counter;
payloads.push(std::make_pair(payload_counter, payload));
if (!use_tx)
{
/// "publisher confirms" will be used, this is default.
++payload_counter;
payloads.push(std::make_pair(payload_counter, payload));
}
else
{
/// means channel->startTransaction() was called, not default, enabled only with table setting.
publish(payload);
}
}
}
bool WriteBufferToRabbitMQProducer::setupConnection()
bool WriteBufferToRabbitMQProducer::setupConnection(bool reconnecting)
{
/// Need to manually restore connection if it is lost.
size_t cnt_retries = 0;
if (reconnecting)
{
/* connection->close() is called in onError() method (called by the AMQP library when a fatal error occurs on the connection)
* inside event_handler, but it is not closed immediately (firstly, all pending operations are completed, and then an AMQP
* closing-handshake is performed). But cannot open a new connection untill previous one is properly closed).
*/
while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
event_handler->iterateLoop();
if (!connection->closed())
connection->close(true);
}
LOG_TRACE(log, "Trying to set up connection");
connection = std::make_unique<AMQP::TcpConnection>(event_handler.get(),
AMQP::Address(parsed_address.first, parsed_address.second, AMQP::Login(login_password.first, login_password.second), "/"));
LOG_TRACE(log, "Trying to set up connection");
size_t cnt_retries = 0;
cnt_retries = 0;
while (!connection->ready() && ++cnt_retries != RETRIES_MAX)
{
event_handler->iterateLoop();
@ -136,16 +156,12 @@ void WriteBufferToRabbitMQProducer::setupChannel()
{
LOG_ERROR(log, "Producer error: {}", message);
/* Means channel ends up in an error state and is not usable anymore.
* (See https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/36#issuecomment-125112236)
*/
/// Channel is not usable anymore. (https://github.com/CopernicaMarketingSoftware/AMQP-CPP/issues/36#issuecomment-125112236)
producer_channel->close();
if (use_tx)
return;
for (auto record = delivery_record.begin(); record != delivery_record.end(); record++)
returned.tryPush(record->second);
/// Records that have not received ack/nack from server before channel closure.
for (const auto & record : delivery_record)
returned.tryPush(record.second);
LOG_DEBUG(log, "Currently {} messages have not been confirmed yet, {} waiting to be published, {} will be republished",
delivery_record.size(), payloads.size(), returned.size());
@ -240,7 +256,7 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
envelope.setHeaders(message_settings);
/* Adding here a message_id property to message metadata.
* (See https://stackoverflow.com/questions/59384305/rabbitmq-how-to-handle-unwanted-duplicate-un-ack-message-after-connection-lost)
* (https://stackoverflow.com/questions/59384305/rabbitmq-how-to-handle-unwanted-duplicate-un-ack-message-after-connection-lost)
*/
envelope.setMessageID(channel_id + "-" + std::to_string(payload.first));
@ -275,24 +291,29 @@ void WriteBufferToRabbitMQProducer::publish(ConcurrentBoundedQueue<std::pair<UIn
void WriteBufferToRabbitMQProducer::writingFunc()
{
if (use_tx)
return;
while (!payloads.empty() || wait_all)
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned.queue never grows too big
* and returned messages are republished as fast as possible. Also payloads.queue is fixed size and push attemt would block thread
* in countRow() once there is no space - that is intended.
*/
if (!returned.empty() && producer_channel->usable())
publish(returned, true);
else if (!payloads.empty() && producer_channel->usable())
publish(payloads, false);
else if (use_tx)
break;
/// This check is to make sure that delivery_record.size() is never bigger than returned.size()
if (delivery_record.size() < (BATCH << 6))
{
/* Publish main paylods only when there are no returned messages. This way it is ensured that returned.queue never grows too
* big and returned messages are republished as fast as possible. Also payloads.queue is fixed size and push attemt would
* block thread in countRow() once there is no space - that is intended.
*/
if (!returned.empty() && producer_channel->usable())
publish(returned, true);
else if (!payloads.empty() && producer_channel->usable())
publish(payloads, false);
}
iterateEventLoop();
if (wait_num.load() && delivery_record.empty() && payloads.empty() && returned.empty())
wait_all = false;
else if ((!producer_channel->usable() && connection->usable()) || (!use_tx && !connection->usable() && setupConnection()))
else if ((!producer_channel->usable() && connection->usable()) || (!connection->usable() && setupConnection(true)))
setupChannel();
}
@ -300,9 +321,34 @@ void WriteBufferToRabbitMQProducer::writingFunc()
}
/* This publish is for the case when transaction is delcared on the channel with channel->startTransaction(). Here only publish
* once payload is available and then commitTransaction() is called, where a needed event loop will run.
*/
void WriteBufferToRabbitMQProducer::publish(const String & payload)
{
AMQP::Envelope envelope(payload.data(), payload.size());
if (persistent)
envelope.setDeliveryMode(2);
if (exchange_type == AMQP::ExchangeType::consistent_hash)
{
producer_channel->publish(exchange_name, std::to_string(delivery_tag), envelope);
}
else if (exchange_type == AMQP::ExchangeType::headers)
{
producer_channel->publish(exchange_name, "", envelope);
}
else
{
producer_channel->publish(exchange_name, routing_keys[0], envelope);
}
}
void WriteBufferToRabbitMQProducer::commit()
{
/* Actually have not yet found any information about how is it supposed work once any error occurs with a channel, because any channel
/* Actually have not yet found any information about how is it supposed work once any error occurs with a channel, because any channel
* error closes this channel and any operation on a closed channel will fail (but transaction is unique to channel).
* RabbitMQ transactions seem not trust-worthy at all - see https://www.rabbitmq.com/semantics.html. Seems like its best to always
* use "publisher confirms" rather than transactions (and by default it is so). Probably even need to delete this option.
@ -311,6 +357,7 @@ void WriteBufferToRabbitMQProducer::commit()
return;
std::atomic<bool> answer_received = false, wait_rollback = false;
producer_channel->commitTransaction()
.onSuccess([&]()
{
@ -320,9 +367,9 @@ void WriteBufferToRabbitMQProducer::commit()
.onError([&](const char * message1)
{
answer_received = true;
wait_rollback = true;
LOG_TRACE(log, "Publishing not successful: {}", message1);
wait_rollback = true;
producer_channel->rollbackTransaction()
.onSuccess([&]()
{
@ -330,8 +377,8 @@ void WriteBufferToRabbitMQProducer::commit()
})
.onError([&](const char * message2)
{
LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
wait_rollback = false;
LOG_ERROR(log, "Failed to rollback transaction: {}", message2);
});
});

View File

@ -44,10 +44,11 @@ private:
void nextImpl() override;
void iterateEventLoop();
void writingFunc();
bool setupConnection();
bool setupConnection(bool reconnecting);
void setupChannel();
void removeConfirmed(UInt64 received_delivery_tag, bool multiple, bool republish);
void publish(ConcurrentBoundedQueue<std::pair<UInt64, String>> & message, bool republishing);
void publish(const String & payload);
std::pair<String, UInt16> parsed_address;
const std::pair<String, String> login_password;

View File

@ -86,6 +86,18 @@ def rabbitmq_check_result(result, check=False, ref_file='test_rabbitmq_json.refe
return TSV(result) == TSV(reference)
def kill_rabbitmq():
p = subprocess.Popen(('docker', 'stop', rabbitmq_id), stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def revive_rabbitmq():
p = subprocess.Popen(('docker', 'start', rabbitmq_id), stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
# Fixtures
@pytest.fixture(scope="module")
@ -1684,7 +1696,7 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
while True:
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result1) > collected:
if int(result1) == messages_num * threads_num:
break
instance.query('''
@ -1693,7 +1705,7 @@ def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
DROP TABLE IF EXISTS test.view;
''')
assert int(result1) > collected, 'ClickHouse lost some messages: {}'.format(result)
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
@ -1868,6 +1880,158 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster):
assert int(result2) == 8
@pytest.mark.timeout(420)
def test_rabbitmq_consumer_restore_connection(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
i = [0]
messages_num = 5000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
for message in messages:
channel.basic_publish(exchange='consumer_reconnect', routing_key='', body=message, properties=pika.BasicProperties(delivery_mode = 2))
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
for thread in threads:
thread.join()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.consumer_reconnect;
''')
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
kill_rabbitmq();
time.sleep(4);
revive_rabbitmq();
collected = int(instance.query('SELECT count() FROM test.view'))
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
print("receiived", result, "collected", collected)
if int(result) >= messages_num * threads_num:
break
instance.query('''
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer_reconnect;
''')
# >= because at-least-once
assert int(result) >= messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_producer_restore_connection(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination(key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
''')
instance.query('''
DROP TABLE IF EXISTS test.consume;
DROP TABLE IF EXISTS test.consume_mv;
CREATE TABLE test.consume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.consume_mv TO test.destination AS
SELECT key, value FROM test.consume;
''')
instance.query('''
DROP TABLE IF EXISTS test.producer_reconnect;
CREATE TABLE test.producer_reconnect (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'producer_reconnect',
rabbitmq_persistent_mode = '1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages_num = 100000
values = []
for i in range(messages_num):
values.append("({i}, {i})".format(i=i))
values = ','.join(values)
while True:
try:
instance.query("INSERT INTO test.producer_reconnect VALUES {}".format(values))
break
except QueryRuntimeException as e:
if 'Local: Timed out.' in str(e):
continue
else:
raise
while int(instance.query('SELECT count() FROM test.destination')) == 0:
time.sleep(0.1)
kill_rabbitmq();
time.sleep(4);
revive_rabbitmq();
while True:
result = instance.query('SELECT count() FROM test.destination')
time.sleep(1)
print(result, messages_num)
if int(result) >= messages_num:
break
instance.query('''
DROP TABLE IF EXISTS test.consume_mv;
DROP TABLE IF EXISTS test.consume;
DROP TABLE IF EXISTS test.producer_reconnect;
DROP TABLE IF EXISTS test.destination;
''')
assert int(result) >= messages_num, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")