diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index e26645a1168..16ba14094ac 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -22,8 +22,10 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream( , column_names(columns) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , virtual_header(metadata_snapshot->getSampleBlockForColumns( - {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered"}, storage.getVirtuals(), storage.getStorageID())) + {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"}, storage.getVirtuals(), storage.getStorageID())) { + if (!storage.getSchemaName().empty()) + context.setSetting("format_schema", storage.getSchemaName()); } @@ -131,6 +133,7 @@ Block RabbitMQBlockInputStream::readImpl() auto channel_id = buffer->getChannelID(); auto delivery_tag = buffer->getDeliveryTag(); auto redelivered = buffer->getRedelivered(); + auto message_id = buffer->getMessageID(); buffer->updateAckTracker({delivery_tag, channel_id}); @@ -140,6 +143,7 @@ Block RabbitMQBlockInputStream::readImpl() virtual_columns[1]->insert(channel_id); virtual_columns[2]->insert(delivery_tag); virtual_columns[3]->insert(redelivered); + virtual_columns[4]->insert(message_id); } total_rows = total_rows + new_rows; diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 5d15bd5b77d..2416a15f65a 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -14,6 +14,7 @@ namespace DB M(String, rabbitmq_exchange_name, "clickhouse-exchange", "The exchange name, to which messages are sent.", 0) \ M(String, rabbitmq_format, "", "The message format.", 0) \ M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \ + M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \ M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \ M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \ M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 833382f354b..197b9f7e057 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -14,6 +14,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static const auto QUEUE_SIZE = 50000; ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( @@ -51,7 +56,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( consumer_channel->onReady([&]() { - channel_id = channel_base + "_" + std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++); + channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base; LOG_TRACE(log, "Channel {} is created", channel_id); consumer_channel->onError([&](const char * message) @@ -142,7 +147,10 @@ void ReadBufferFromRabbitMQConsumer::subscribe() if (row_delimiter != '\0') message_received += row_delimiter; - received.push({message_received, redelivered, AckTracker(delivery_tag, channel_id)}); + if (message.hasMessageID()) + received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)}); + else + received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)}); } }) .onError([&](const char * message) @@ -195,7 +203,11 @@ void ReadBufferFromRabbitMQConsumer::restoreChannel(ChannelPtr new_channel) consumer_channel = std::move(new_channel); consumer_channel->onReady([&]() { - channel_id = channel_base + "_" + std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++); + /* First number indicates current consumer buffer; second number indicates serial number of created channel for current buffer, + * i.e. if channel fails - another one is created and its serial number is incremented; channel_base is to guarantee that + * channel_id is unique for each table. + */ + channel_id = std::to_string(channel_id_base) + "_" + std::to_string(channel_id_counter++) + "_" + channel_base; LOG_TRACE(log, "Channel {} is created", channel_id); consumer_channel->onError([&](const char * message) diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index c5643cb59f4..e00e8172509 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -51,6 +51,7 @@ public: struct MessageData { String message; + String message_id; bool redelivered; AckTracker track; }; @@ -65,6 +66,7 @@ public: auto getChannelID() const { return current.track.channel_id; } auto getDeliveryTag() const { return current.track.delivery_tag; } auto getRedelivered() const { return current.redelivered; } + auto getMessageID() const { return current.message_id; } private: bool nextImpl() override; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index f85f7d6b59c..f82773ed367 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -69,6 +69,7 @@ StorageRabbitMQ::StorageRabbitMQ( const String & exchange_name_, const String & format_name_, char row_delimiter_, + const String & schema_name_, const String & exchange_type_, size_t num_consumers_, size_t num_queues_, @@ -83,6 +84,7 @@ StorageRabbitMQ::StorageRabbitMQ( , exchange_name(exchange_name_) , format_name(global_context.getMacros()->expand(format_name_)) , row_delimiter(row_delimiter_) + , schema_name(global_context.getMacros()->expand(schema_name_)) , num_consumers(num_consumers_) , num_queues(num_queues_) , use_transactional_channel(use_transactional_channel_) @@ -785,13 +787,29 @@ void registerStorageRabbitMQ(StorageFactory & factory) } } - String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value; + String schema = rabbitmq_settings.rabbitmq_schema.value; if (args_count >= 6) { engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context); const auto * ast = engine_args[5]->as(); if (ast && ast->value.getType() == Field::Types::String) + { + schema = safeGet(ast->value); + } + else + { + throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS); + } + } + + String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value; + if (args_count >= 7) + { + engine_args[6] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[6], args.local_context); + + const auto * ast = engine_args[6]->as(); + if (ast && ast->value.getType() == Field::Types::String) { exchange_type = safeGet(ast->value); } @@ -802,9 +820,9 @@ void registerStorageRabbitMQ(StorageFactory & factory) } UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers; - if (args_count >= 7) + if (args_count >= 8) { - const auto * ast = engine_args[6]->as(); + const auto * ast = engine_args[7]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); @@ -816,9 +834,9 @@ void registerStorageRabbitMQ(StorageFactory & factory) } UInt64 num_queues = rabbitmq_settings.rabbitmq_num_queues; - if (args_count >= 8) + if (args_count >= 9) { - const auto * ast = engine_args[7]->as(); + const auto * ast = engine_args[8]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { num_consumers = safeGet(ast->value); @@ -830,9 +848,9 @@ void registerStorageRabbitMQ(StorageFactory & factory) } bool use_transactional_channel = static_cast(rabbitmq_settings.rabbitmq_transactional_channel); - if (args_count >= 9) + if (args_count >= 10) { - const auto * ast = engine_args[8]->as(); + const auto * ast = engine_args[9]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { use_transactional_channel = static_cast(safeGet(ast->value)); @@ -844,33 +862,33 @@ void registerStorageRabbitMQ(StorageFactory & factory) } String queue_base = rabbitmq_settings.rabbitmq_queue_base.value; - if (args_count >= 10) - { - engine_args[9] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[9], args.local_context); - - const auto * ast = engine_args[9]->as(); - if (ast && ast->value.getType() == Field::Types::String) - { - queue_base = safeGet(ast->value); - } - } - - String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value; if (args_count >= 11) { engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context); const auto * ast = engine_args[10]->as(); if (ast && ast->value.getType() == Field::Types::String) + { + queue_base = safeGet(ast->value); + } + } + + String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value; + if (args_count >= 12) + { + engine_args[11] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[11], args.local_context); + + const auto * ast = engine_args[11]->as(); + if (ast && ast->value.getType() == Field::Types::String) { deadletter_exchange = safeGet(ast->value); } } bool persistent = static_cast(rabbitmq_settings.rabbitmq_persistent_mode); - if (args_count >= 12) + if (args_count >= 13) { - const auto * ast = engine_args[11]->as(); + const auto * ast = engine_args[12]->as(); if (ast && ast->value.getType() == Field::Types::UInt64) { persistent = static_cast(safeGet(ast->value)); @@ -883,7 +901,7 @@ void registerStorageRabbitMQ(StorageFactory & factory) return StorageRabbitMQ::create( args.table_id, args.context, args.columns, - host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers, + host_port, routing_keys, exchange, format, row_delimiter, schema, exchange_type, num_consumers, num_queues, use_transactional_channel, queue_base, deadletter_exchange, persistent); }; @@ -898,7 +916,8 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const {"_exchange_name", std::make_shared()}, {"_channel_id", std::make_shared()}, {"_delivery_tag", std::make_shared()}, - {"_redelivered", std::make_shared()} + {"_redelivered", std::make_shared()}, + {"_message_id", std::make_shared()} }; } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 0960e35d3bf..60bc1aa7157 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -55,6 +55,7 @@ public: const String & getFormatName() const { return format_name; } NamesAndTypesList getVirtuals() const override; + const auto & getSchemaName() const { return schema_name; } const String getExchange() const { return exchange_name; } bool checkBridge() const { return !exchange_removed.load(); } @@ -74,6 +75,7 @@ protected: const String & exchange_name_, const String & format_name_, char row_delimiter_, + const String & schema_name_, const String & exchange_type_, size_t num_consumers_, size_t num_queues_, @@ -92,6 +94,7 @@ private: const String format_name; char row_delimiter; + const String schema_name; size_t num_consumers; size_t num_created_consumers = 0; bool hash_exchange; diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 2b818f0341f..8cd769e792f 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -186,7 +186,7 @@ void WriteBufferToRabbitMQProducer::setupChannel() producer_channel->onReady([&]() { - channel_id = channel_base + "_" + channel_id_base + std::to_string(channel_id_counter++); + channel_id = channel_id_base + std::to_string(channel_id_counter++) + "_" + channel_base; LOG_DEBUG(log, "Producer's channel {} is ready", channel_id); if (use_txn) diff --git a/tests/integration/test_storage_rabbitmq/clickhouse_path/format_schemas/rabbitmq.proto b/tests/integration/test_storage_rabbitmq/clickhouse_path/format_schemas/rabbitmq.proto new file mode 100644 index 00000000000..96b24be4938 --- /dev/null +++ b/tests/integration/test_storage_rabbitmq/clickhouse_path/format_schemas/rabbitmq.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message KeyValuePair { + uint64 key = 1; + string value = 2; +} \ No newline at end of file diff --git a/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py b/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py new file mode 100644 index 00000000000..fb0f1413eac --- /dev/null +++ b/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: clickhouse_path/format_schemas/rabbitmq.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='clickhouse_path/format_schemas/rabbitmq.proto', + package='', + syntax='proto3', + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3' +) + + + + +_KEYVALUEPAIR = _descriptor.Descriptor( + name='KeyValuePair', + full_name='KeyValuePair', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='KeyValuePair.key', index=0, + number=1, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='value', full_name='KeyValuePair.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=49, + serialized_end=91, +) + +DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), { + 'DESCRIPTOR' : _KEYVALUEPAIR, + '__module__' : 'clickhouse_path.format_schemas.rabbitmq_pb2' + # @@protoc_insertion_point(class_scope:KeyValuePair) + }) +_sym_db.RegisterMessage(KeyValuePair) + + +# @@protoc_insertion_point(module_scope) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index bb65319a3be..b8ccbf9ce56 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -16,13 +16,19 @@ from helpers.network import PartitionManager import json import subprocess +import avro.schema +from confluent.schemaregistry.client import CachedSchemaRegistryClient +from confluent.schemaregistry.serializers.MessageSerializer import MessageSerializer from google.protobuf.internal.encoder import _VarintBytes +import rabbitmq_pb2 + cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', config_dir='configs', main_configs=['configs/rabbitmq.xml','configs/log_conf.xml'], - with_rabbitmq=True) + with_rabbitmq=True, + clickhouse_path_dir='clickhouse_path') rabbitmq_id = '' @@ -316,6 +322,57 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): rabbitmq_check_result(result, True) +@pytest.mark.timeout(180) +def test_rabbitmq_protobuf(rabbitmq_cluster): + instance.query(''' + CREATE TABLE test.rabbitmq (key UInt64, value String) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'pb', + rabbitmq_format = 'Protobuf', + rabbitmq_schema = 'rabbitmq.proto:KeyValuePair'; + ''') + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + data = '' + for i in range(0, 20): + msg = rabbitmq_pb2.KeyValuePair() + msg.key = i + msg.value = str(i) + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + channel.basic_publish(exchange='pb', routing_key='', body=data) + data = '' + for i in range(20, 21): + msg = rabbitmq_pb2.KeyValuePair() + msg.key = i + msg.value = str(i) + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + channel.basic_publish(exchange='pb', routing_key='', body=data) + data = '' + for i in range(21, 50): + msg = rabbitmq_pb2.KeyValuePair() + msg.key = i + msg.value = str(i) + serialized_msg = msg.SerializeToString() + data = data + _VarintBytes(len(serialized_msg)) + serialized_msg + channel.basic_publish(exchange='pb', routing_key='', body=data) + + result = '' + while True: + result += instance.query('SELECT * FROM test.rabbitmq') + if rabbitmq_check_result(result): + break + + connection.close() + rabbitmq_check_result(result, True) + + @pytest.mark.timeout(180) def test_rabbitmq_materialized_view(rabbitmq_cluster): instance.query(''' @@ -451,6 +508,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): DROP TABLE test.view2; ''') + connection.close() rabbitmq_check_result(result1, True) rabbitmq_check_result(result2, True) @@ -1440,7 +1498,7 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster): connection.close() result = instance.query(''' - SELECT key, value, _exchange_name, SUBSTRING(_channel_id, 34, 3), _delivery_tag, _redelivered + SELECT key, value, _exchange_name, SUBSTRING(_channel_id, 1, 3), _delivery_tag, _redelivered FROM test.view ORDER BY key ''') @@ -1505,7 +1563,7 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): connection.close() - result = instance.query("SELECT key, value, exchange_name, SUBSTRING(channel_id, 34, 3), delivery_tag, redelivered FROM test.view ORDER BY delivery_tag") + result = instance.query("SELECT key, value, exchange_name, SUBSTRING(channel_id, 1, 3), delivery_tag, redelivered FROM test.view ORDER BY delivery_tag") expected = '''\ 0 0 virtuals_mv 1_0 1 0 1 1 virtuals_mv 1_0 2 0 @@ -1769,7 +1827,7 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster): @pytest.mark.timeout(420) -def test_rabbitmq_consumer_restore_failed_connection_without_losses(rabbitmq_cluster): +def test_rabbitmq_consumer_restore_failed_connection_without_losses_1(rabbitmq_cluster): instance.query(''' CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64) ENGINE = RabbitMQ @@ -1901,71 +1959,72 @@ def test_rabbitmq_producer_restore_failed_connection_without_losses(rabbitmq_clu @pytest.mark.timeout(420) -def test_rabbitmq_virtual_columns_2(rabbitmq_cluster): +def test_rabbitmq_consumer_restore_failed_connection_without_losses_2(rabbitmq_cluster): instance.query(''' - DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - exchange_name String, channel_id String, delivery_tag UInt64, redelivered UInt8) ENGINE = MergeTree() - ORDER BY key; + CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'consumer_reconnect', + rabbitmq_num_consumers = 10, + rabbitmq_num_queues = 2, + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; ''') - table_num = 3 - for table_id in range(table_num): - print("Setting up table {}".format(table_id)) - instance.query(''' - DROP TABLE IF EXISTS test.virtuals_{0}; - DROP TABLE IF EXISTS test.virtuals_{0}_mv; - CREATE TABLE test.virtuals_{0} (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_exchange_name = 'virtuals_2', - rabbitmq_num_queues = 2, - rabbitmq_num_consumers = 2, - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - CREATE MATERIALIZED VIEW test.many_consumers_{0}_mv TO test.destination AS - SELECT *, _exchange_name as exchange_name, _channel_id as channel_id, _delivery_tag as delivery_tag, _redelivered as redelivered - FROM test.virtuals_{0}; - '''.format(table_id)) + i = 0 + messages_num = 150000 credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + connection = pika.BlockingConnection(parameters) channel = connection.channel() - - message_num = 10 - i = 0 messages = [] - for _ in range(message_num): + for _ in range(messages_num): messages.append(json.dumps({'key': i, 'value': i})) i += 1 + for i in range(messages_num): + channel.basic_publish(exchange='consumer_reconnect', routing_key='', body=messages[i], + properties=pika.BasicProperties(delivery_mode = 2, message_id=str(i))) + connection.close() - for i in range(message_num): - channel.basic_publish(exchange='virtuals_2', routing_key='', body=messages[i], - properties=pika.BasicProperties(delivery_mode=2, message_id=str(i))) + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.view (key UInt64, value UInt64) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.consumer_reconnect; + ''') - #kill_rabbitmq(); - #time.sleep(2); - #revive_rabbitmq(); + while int(instance.query('SELECT count() FROM test.view')) == 0: + time.sleep(0.1) + + kill_rabbitmq(); + time.sleep(8); + revive_rabbitmq(); + + while int(instance.query('SELECT count() FROM test.view')) == 0: + time.sleep(0.1) + + kill_rabbitmq(); + time.sleep(2); + revive_rabbitmq(); while True: - result = instance.query('SELECT count(DISTINCT concat([channel_id], [toString(delivery_tag)])) FROM test.destination') - print instance.query(''' - SELECT DISTINCT concat([channel_id], [toString(delivery_tag)]) - FROM (SELECT channel_id AS id, delivery_tag AS tag FROM test.destination GROUP BY id ORDER BY tag)''') + result = instance.query('SELECT count(DISTINCT key) FROM test.view') time.sleep(1) - if int(result) == message_num * table_num: + if int(result) == messages_num: break - connection.close() - instance.query(''' DROP TABLE IF EXISTS test.consumer; DROP TABLE IF EXISTS test.view; - DROP TABLE IF EXISTS test.rabbitmq_virtuals_mv + DROP TABLE IF EXISTS test.consumer_reconnect; ''') - assert int(result) == message_num * table_num + assert int(result) == messages_num, 'ClickHouse lost some messages: {}'.format(result) if __name__ == '__main__':