diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 83e3a02b478..7b1cdd11317 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -21,7 +21,8 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream( , context(context_) , column_names(columns) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) - , virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID())) + , virtual_header(metadata_snapshot->getSampleBlockForColumns( + {"_exchange_name", "_consumer_tag", "_delivery_tag", "_redelivered"}, storage.getVirtuals(), storage.getStorageID())) { } @@ -124,10 +125,16 @@ Block RabbitMQBlockInputStream::readImpl() auto new_rows = read_rabbitmq_message(); auto exchange_name = buffer->getExchange(); + auto consumer_tag = buffer->getConsumerTag(); + auto delivery_tag = buffer->getDeliveryTag(); + auto redelivered = buffer->getRedelivered(); for (size_t i = 0; i < new_rows; ++i) { virtual_columns[0]->insert(exchange_name); + virtual_columns[1]->insert(consumer_tag); + virtual_columns[2]->insert(delivery_tag); + virtual_columns[3]->insert(redelivered); } total_rows = total_rows + new_rows; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index d59e9c9eade..0aff21f8a8e 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -48,7 +48,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , log(log_) , row_delimiter(row_delimiter_) , stopped(stopped_) - , messages(QUEUE_SIZE * num_queues) + , received(QUEUE_SIZE * num_queues) { /* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added. * By default there is one queue per consumer. @@ -65,7 +65,7 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() { consumer_channel->close(); - messages.clear(); + received.clear(); BufferBase::set(nullptr, 0, 0); } @@ -278,15 +278,16 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) return; consumer_channel->consume(queue_name, AMQP::noack) - .onSuccess([&](const std::string & /* consumer */) + .onSuccess([&](const std::string & consumer) { subscribed_queue[queue_name] = true; consumer_error = false; ++count_subscribed; + consumer_tag = consumer; LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name); }) - .onReceived([&](const AMQP::Message & message, uint64_t /* deliveryTag */, bool /* redelivered */) + .onReceived([&](const AMQP::Message & message, uint64_t deliveryTag, bool redelivered) { size_t message_size = message.bodySize(); if (message_size && message.body() != nullptr) @@ -297,7 +298,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) message_received += row_delimiter; } - messages.push(message_received); + received.push({deliveryTag, message_received, redelivered}); } }) .onError([&](const char * message) @@ -346,10 +347,10 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() if (stopped || !allowed) return false; - if (messages.tryPop(current)) + if (received.tryPop(current)) { - auto * new_position = const_cast(current.data()); - BufferBase::set(new_position, current.size(), 0); + auto * new_position = const_cast(current.message.data()); + BufferBase::set(new_position, current.message.size(), 0); allowed = false; return true; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 82dc3f55248..85644562d0c 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -38,10 +38,20 @@ public: ~ReadBufferFromRabbitMQConsumer() override; + struct MessageData + { + UInt64 delivery_tag; + String message; + bool redelivered; + }; + void allowNext() { allowed = true; } // Allow to read next message. void checkSubscription(); auto getExchange() const { return exchange_name; } + auto getConsumerTag() const { return consumer_tag; } + auto getDeliveryTag() const { return current.delivery_tag; } + auto getRedelivered() const { return current.redelivered; } private: ChannelPtr consumer_channel; @@ -69,8 +79,9 @@ private: std::atomic consumer_error = false; std::atomic count_subscribed = 0, wait_subscribed; - ConcurrentBoundedQueue messages; - String current; + String consumer_tag; + ConcurrentBoundedQueue received; + MessageData current; std::vector queues; std::unordered_map subscribed_queue; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 0d6cf95f39c..66af7dc3f56 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -597,7 +597,10 @@ void registerStorageRabbitMQ(StorageFactory & factory) NamesAndTypesList StorageRabbitMQ::getVirtuals() const { return NamesAndTypesList{ - {"_exchange", std::make_shared()} + {"_exchange_name", std::make_shared()}, + {"_consumer_tag", std::make_shared()}, + {"_delivery_tag", std::make_shared()}, + {"_redelivered", std::make_shared()} }; } diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index a044eba805c..eaaa8613b5f 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -1520,6 +1520,140 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): assert int(result) == messages_num * num_tables_to_receive, 'ClickHouse lost some messages: {}'.format(result) +@pytest.mark.timeout(420) +def test_rabbitmq_virtual_columns(rabbitmq_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.view; + CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'virtuals', + rabbitmq_format = 'JSONEachRow'; + CREATE MATERIALIZED VIEW test.view Engine=Log AS + SELECT value, key, _exchange_name, _consumer_tag, _delivery_tag, _redelivered FROM test.rabbitmq; + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.exchange_declare(exchange='virtuals', exchange_type='fanout') + + message_num = 10 + i = [0] + messages = [] + for _ in range(message_num): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + + for message in messages: + channel.basic_publish(exchange='virtuals', routing_key='', body=message) + + while True: + result = instance.query('SELECT count() FROM test.view') + time.sleep(1) + if int(result) == message_num: + break + + connection.close() + + result = instance.query("SELECT count(DISTINCT _delivery_tag) FROM test.view") + assert int(result) == 10 + + result = instance.query("SELECT count(DISTINCT _consumer_tag) FROM test.view") + assert int(result) == 1 + + result = instance.query(''' + SELECT key, value, _exchange_name, SUBSTRING(_consumer_tag, 1, 8), _delivery_tag, _redelivered + FROM test.view + ORDER BY key + ''') + + expected = '''\ +0 0 virtuals amq.ctag 1 0 +1 1 virtuals amq.ctag 2 0 +2 2 virtuals amq.ctag 3 0 +3 3 virtuals amq.ctag 4 0 +4 4 virtuals amq.ctag 5 0 +5 5 virtuals amq.ctag 6 0 +6 6 virtuals amq.ctag 7 0 +7 7 virtuals amq.ctag 8 0 +8 8 virtuals amq.ctag 9 0 +9 9 virtuals amq.ctag 10 0 +''' + assert TSV(result) == TSV(expected) + + +@pytest.mark.timeout(420) +def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'virtuals_mv', + rabbitmq_format = 'JSONEachRow'; + CREATE TABLE test.view (key UInt64, value UInt64, + exchange_name String, consumer_tag String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree() + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT *, _exchange_name as exchange_name, _consumer_tag as consumer_tag, _delivery_tag as delivery_tag, _redelivered as redelivered + FROM test.rabbitmq; + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.exchange_declare(exchange='virtuals_mv', exchange_type='fanout') + + message_num = 10 + i = [0] + messages = [] + for _ in range(message_num): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + + for message in messages: + channel.basic_publish(exchange='virtuals_mv', routing_key='', body=message) + + while True: + result = instance.query('SELECT count() FROM test.view') + time.sleep(1) + if int(result) == message_num: + break + + connection.close() + + result = instance.query("SELECT count(DISTINCT delivery_tag) FROM test.view") + assert int(result) == 10 + + result = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view") + assert int(result) == 1 + + result = instance.query("SELECT key, value, exchange_name, SUBSTRING(consumer_tag, 1, 8), delivery_tag, redelivered FROM test.view") + expected = '''\ +0 0 virtuals_mv amq.ctag 1 0 +1 1 virtuals_mv amq.ctag 2 0 +2 2 virtuals_mv amq.ctag 3 0 +3 3 virtuals_mv amq.ctag 4 0 +4 4 virtuals_mv amq.ctag 5 0 +5 5 virtuals_mv amq.ctag 6 0 +6 6 virtuals_mv amq.ctag 7 0 +7 7 virtuals_mv amq.ctag 8 0 +8 8 virtuals_mv amq.ctag 9 0 +9 9 virtuals_mv amq.ctag 10 0 +''' + + instance.query(''' + DROP TABLE test.consumer; + DROP TABLE test.view; + ''') + + assert TSV(result) == TSV(expected) + + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")