Fix, add test

This commit is contained in:
kssenii 2020-12-02 18:34:01 +00:00
parent 8d3a538629
commit ebab21178e
5 changed files with 57 additions and 3 deletions

View File

@ -58,7 +58,10 @@ void RabbitMQBlockInputStream::readPrefixImpl()
buffer = storage.popReadBuffer(timeout); buffer = storage.popReadBuffer(timeout);
if (!buffer->getChannel()) if (!buffer->getChannel())
{
storage.updateQueues(buffer->getQueues());
updateChannel(); updateChannel();
}
} }

View File

@ -57,6 +57,7 @@ public:
bool isChannelError() { return channel_error; } bool isChannelError() { return channel_error; }
/// Do not allow to update channel if current channel is not properly set up and subscribed /// Do not allow to update channel if current channel is not properly set up and subscribed
bool isChannelUpdateAllowed() { return !wait_subscription; } bool isChannelUpdateAllowed() { return !wait_subscription; }
std::vector<String> & getQueues() { return queues; }
ChannelPtr & getChannel() { return consumer_channel; } ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel(); void setupChannel();

View File

@ -47,7 +47,6 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int CANNOT_CONNECT_RABBITMQ;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE; extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE; extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
@ -335,7 +334,6 @@ void StorageRabbitMQ::initExchange()
} }
void StorageRabbitMQ::bindExchange() void StorageRabbitMQ::bindExchange()
{ {
std::atomic<bool> binding_created = false; std::atomic<bool> binding_created = false;
@ -499,6 +497,7 @@ bool StorageRabbitMQ::restoreConnection(bool reconnecting)
void StorageRabbitMQ::updateChannel(ChannelPtr & channel) void StorageRabbitMQ::updateChannel(ChannelPtr & channel)
{ {
std::lock_guard lock(conn_mutex);
if (event_handler->connectionRunning()) if (event_handler->connectionRunning())
channel = std::make_shared<AMQP::TcpChannel>(connection.get()); channel = std::make_shared<AMQP::TcpChannel>(connection.get());
else else

View File

@ -63,6 +63,7 @@ public:
bool exchangeRemoved() { return exchange_removed.load(); } bool exchangeRemoved() { return exchange_removed.load(); }
void updateChannel(ChannelPtr & channel); void updateChannel(ChannelPtr & channel);
void updateQueues(std::vector<String> & queues_) { queues_ = queues; }
protected: protected:
StorageRabbitMQ( StorageRabbitMQ(
@ -100,7 +101,7 @@ private:
size_t num_created_consumers = 0; size_t num_created_consumers = 0;
Poco::Semaphore semaphore; Poco::Semaphore semaphore;
std::mutex buffers_mutex; std::mutex buffers_mutex, conn_mutex;
std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers std::vector<ConsumerBufferPtr> buffers; /// available buffers for RabbitMQ consumers
String unique_strbase; /// to make unique consumer channel id String unique_strbase; /// to make unique consumer channel id

View File

@ -1862,6 +1862,56 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster):
assert result == 1, 'Messages from RabbitMQ get duplicated!' assert result == 1, 'Messages from RabbitMQ get duplicated!'
@pytest.mark.timeout(420)
def test_rabbitmq_no_connection_at_startup(rabbitmq_cluster):
# no connection when table is initialized
rabbitmq_cluster.pause_container('rabbitmq1')
instance.query('''
CREATE TABLE test.cs (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'cs',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = '5',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.cs;
''')
time.sleep(5)
rabbitmq_cluster.unpause_container('rabbitmq1')
# need to make sure rabbit table made all rabbit setup
time.sleep(10)
messages_num = 1000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
for i in range(messages_num):
message = json.dumps({'key': i, 'value': i})
channel.basic_publish(exchange='cs', routing_key='', body=message,
properties=pika.BasicProperties(delivery_mode=2, message_id=str(i)))
connection.close()
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.cs;
''')
assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")