This commit is contained in:
kssenii 2021-11-02 15:05:10 +03:00
parent 085cb275de
commit f662c1829d
2 changed files with 146 additions and 1 deletions

View File

@ -645,7 +645,10 @@ Pipe StorageRabbitMQ::read(
} }
if (!connection->getHandler().loopRunning() && connection->isConnected()) if (!connection->getHandler().loopRunning() && connection->isConnected())
{
connection->getHandler().updateLoopState(Loop::RUN);
looping_task->activateAndSchedule(); looping_task->activateAndSchedule();
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
auto united_pipe = Pipe::unitePipes(std::move(pipes)); auto united_pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -250,7 +250,7 @@ def test_rabbitmq_macros(rabbitmq_cluster):
for i in range(50): for i in range(50):
message += json.dumps({'key': i, 'value': i}) + '\n' message += json.dumps({'key': i, 'value': i}) + '\n'
channel.basic_publish(exchange='macro', routing_key='', body=message) channel.basic_publish(exchange='macro', routing_key='', body=message)
connection.close() connection.close()
time.sleep(1) time.sleep(1)
@ -2042,6 +2042,148 @@ def test_rabbitmq_bad_args(rabbitmq_cluster):
''') ''')
def test_rabbitmq_issue_30691(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_drop (json String)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = '30691',
rabbitmq_row_delimiter = '\\n', -- Works only if adding this setting
rabbitmq_format = 'LineAsString',
rabbitmq_queue_base = '30691';
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish(exchange='30691', routing_key='', body=json.dumps({"event_type": "purge", "as_src": 1234, "as_dst": 0, "as_path": "",
"local_pref": 100, "med": 0, "peer_as_dst": 0,
"ip_src": "<redacted ipv6>", "ip_dst": "<redacted ipv6>",
"port_src": 443, "port_dst": 41930, "ip_proto": "tcp",
"tos": 0, "stamp_inserted": "2021-10-26 15:20:00",
"stamp_updated": "2021-10-26 15:23:14", "packets": 2, "bytes": 1216, "writer_id": "default_amqp/449206"}))
result = ''
while True:
result = instance.query('SELECT * FROM test.rabbitmq_drop', ignore_error=True)
print(result)
if result != "":
break
assert(result.strip() =="""{"event_type": "purge", "as_src": 1234, "as_dst": 0, "as_path": "", "local_pref": 100, "med": 0, "peer_as_dst": 0, "ip_src": "<redacted ipv6>", "ip_dst": "<redacted ipv6>", "port_src": 443, "port_dst": 41930, "ip_proto": "tcp", "tos": 0, "stamp_inserted": "2021-10-26 15:20:00", "stamp_updated": "2021-10-26 15:23:14", "packets": 2, "bytes": 1216, "writer_id": "default_amqp/449206"}""")
def test_rabbitmq_drop_mv(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'drop_mv';
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
''')
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for i in range(20):
channel.basic_publish(exchange='mv', routing_key='', body=json.dumps({'key': i, 'value': i}))
instance.query('DROP VIEW test.consumer')
for i in range(20, 40):
channel.basic_publish(exchange='mv', routing_key='', body=json.dumps({'key': i, 'value': i}))
instance.query('''
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
''')
for i in range(40, 50):
channel.basic_publish(exchange='mv', routing_key='', body=json.dumps({'key': i, 'value': i}))
while True:
result = instance.query('SELECT * FROM test.view ORDER BY key')
if (rabbitmq_check_result(result)):
break
rabbitmq_check_result(result, True)
instance.query('DROP VIEW test.consumer')
for i in range(50, 60):
channel.basic_publish(exchange='mv', routing_key='', body=json.dumps({'key': i, 'value': i}))
connection.close()
count = 0
while True:
count = int(instance.query('SELECT count() FROM test.rabbitmq'))
if (count):
break
assert(count > 0)
def test_rabbitmq_random_detach(rabbitmq_cluster):
NUM_CONSUMERS = 2
NUM_QUEUES = 2
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'random',
rabbitmq_queue_base = 'random',
rabbitmq_num_queues = 2,
rabbitmq_num_consumers = 2,
rabbitmq_format = 'JSONEachRow';
CREATE TABLE test.view (key UInt64, value UInt64, channel_id String)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _channel_id AS channel_id FROM test.rabbitmq;
''')
i = [0]
messages_num = 10000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for i in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
mes_id = str(i)
channel.basic_publish(exchange='test_sharding', routing_key='', properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
time.sleep(5)
kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
instance.query("detach table test.rabbitmq")
revive_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id)
for thread in threads:
thread.join()
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")