diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 2d995d97f18..6257a60d678 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -142,8 +142,6 @@ Block RabbitMQBlockInputStream::readImpl() auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); - LOG_DEBUG(log, "Total amount of rows is " + std::to_string(result_block.rows())); - for (const auto & column : virtual_block.getColumnsWithTypeAndName()) { result_block.insert(column); diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 8667427ee63..71c23bb9bc4 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -41,8 +41,9 @@ void RabbitMQHandler::startConsumerLoop(std::atomic & loop_started) */ if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout))) { - loop_started = true; - stop_scheduled.store(false); + loop_started.store(true); + stop_scheduled = false; + event_base_loop(evbase, EVLOOP_NONBLOCK); mutex_before_event_loop.unlock(); } @@ -67,12 +68,8 @@ void RabbitMQHandler::stop() void RabbitMQHandler::stopWithTimeout() { - if (mutex_before_loop_stop.try_lock()) - { - stop_scheduled.store(true); - event_base_loopexit(evbase, &tv); - mutex_before_loop_stop.unlock(); - } + stop_scheduled = true; + event_base_loopexit(evbase, &tv); } } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 31ca4f280e3..ef4398753c2 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -19,7 +19,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -namespace Exchange +namespace ExchangeType { /// Note that default here means default by implementation and not by rabbitmq settings static const String DEFAULT = "default"; @@ -42,7 +42,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( const bool bind_by_id_, const size_t num_queues_, const String & exchange_type_, - const String & local_exchange_name_, + const String & local_exchange_, const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer_channel(std::move(consumer_channel_)) @@ -55,13 +55,15 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , bind_by_id(bind_by_id_) , num_queues(num_queues_) , exchange_type(exchange_type_) - , local_exchange_name(local_exchange_name_) + , local_exchange(local_exchange_) + , local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT) + , local_hash_exchange(local_exchange + "_" + ExchangeType::HASH) , stopped(stopped_) { messages.clear(); current = messages.begin(); - exchange_type_set = exchange_type != Exchange::DEFAULT; + exchange_type_set = exchange_type != ExchangeType::DEFAULT; /* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added. * By default there is one queue per consumer. @@ -87,53 +89,52 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() void ReadBufferFromRabbitMQConsumer::initExchange() { /* This direct-exchange is used for default implemenation and for INSERT query (so it is always declared). If exchange_type - * is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table. + * is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table (default). * This strict division to external and local exchanges is needed to avoid too much complexity with defining exchange_name - * for INSERT query producer and, in general, it is much better to distinguish them into separate ones. + * for INSERT query producer and, in general, it is better to distinguish them into separate ones. */ - String default_exchange = exchange_type_set ? exchange_name + "_" + Exchange::DEFAULT : exchange_name; - consumer_channel->declareExchange(default_exchange, AMQP::fanout).onError([&](const char * message) + consumer_channel->declareExchange(local_default_exchange, AMQP::direct).onError([&](const char * message) { local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare exchange {}. Reason: {}", default_exchange, message); - }); - - default_local_exchange = local_exchange_name; - default_local_exchange += exchange_type_set ? "_default_" + Exchange::DIRECT : "_" + Exchange::DIRECT; - consumer_channel->declareExchange(default_local_exchange, AMQP::direct).onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare exchange {}. Reason: {}", default_local_exchange, message); - }); - - /// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange. - consumer_channel->bindExchange(default_exchange, default_local_exchange, routing_keys[0]).onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind {} exchange to {} exchange. Reason: {}", default_exchange, default_local_exchange, message); + LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message); }); if (!exchange_type_set) + { + consumer_channel->declareExchange(exchange_name, AMQP::fanout).onError([&](const char * message) + { + local_exchange_declared = false; + LOG_ERROR(log, "Failed to declare default fanout-exchange. Reason: {}", message); + }); + + /// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange. + consumer_channel->bindExchange(exchange_name, local_default_exchange, routing_keys[0]).onError([&](const char * message) + { + local_exchange_declared = false; + LOG_ERROR(log, "Failed to bind local direct-exchange to fanout-exchange. Reason: {}", message); + }); + return; + } /// For special purposes to use the flexibility of routing provided by rabbitmq - choosing exchange types is supported. AMQP::ExchangeType type; - if (exchange_type == Exchange::FANOUT) type = AMQP::ExchangeType::fanout; - else if (exchange_type == Exchange::DIRECT) type = AMQP::ExchangeType::direct; - else if (exchange_type == Exchange::TOPIC) type = AMQP::ExchangeType::topic; - else if (exchange_type == Exchange::HASH) type = AMQP::ExchangeType::consistent_hash; - else if (exchange_type == Exchange::HEADERS) type = AMQP::ExchangeType::headers; + if (exchange_type == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout; + else if (exchange_type == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct; + else if (exchange_type == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic; + else if (exchange_type == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash; + else if (exchange_type == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers; else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS); - /* Declare exchange of the specified type and bind it to hash-exchange, which will evenly distribute messages - * between all consumers. (This enables better scaling as without hash-exchange - the only option to avoid getting - * the same messages more than once - is having only one consumer with one queue, which is not good.) + /* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which + * will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only + * option to avoid getting the same messages more than once - is having only one consumer with one queue, which is not good.) */ consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message) { local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare client's {} exchange: {}", exchange_type, message); + LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message); }); /// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash @@ -142,26 +143,32 @@ void ReadBufferFromRabbitMQConsumer::initExchange() hash_exchange = true; - if (exchange_type == Exchange::HASH) + if (exchange_type == ExchangeType::HASH) return; - AMQP::Table exchange_arguments; - exchange_arguments["hash-property"] = "message_id"; + /* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But + * in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys + * of other types: headers, patterns and string-keys. This means that hash property must be changed. + */ + AMQP::Table binding_arguments; + binding_arguments["hash-property"] = "message_id"; - String local_hash_exchange_name = local_exchange_name + "_hash"; - consumer_channel->declareExchange(local_hash_exchange_name, AMQP::consistent_hash, exchange_arguments) + /// Declare exchange for sharding. + consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments) .onError([&](const char * message) { local_exchange_declared = false; LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message); }); - if (exchange_type == Exchange::HEADERS) + /// Then bind client's exchange to sharding exchange (by keys, specified by the client): + + if (exchange_type == ExchangeType::HEADERS) { AMQP::Table binding_arguments; std::vector matching; - for (auto & header : routing_keys) + for (const auto & header : routing_keys) { boost::split(matching, header, [](char c){ return c == '='; }); binding_arguments[matching[0]] = matching[1]; @@ -169,21 +176,21 @@ void ReadBufferFromRabbitMQConsumer::initExchange() } /// Routing key can be arbitrary here. - consumer_channel->bindExchange(exchange_name, local_hash_exchange_name, routing_keys[0], binding_arguments) + consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments) .onError([&](const char * message) { local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind {} exchange to {} exchange: {}", local_exchange_name, exchange_name, message); + LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message); }); } else { - for (auto & routing_key : routing_keys) + for (const auto & routing_key : routing_keys) { - consumer_channel->bindExchange(exchange_name, local_hash_exchange_name, routing_key).onError([&](const char * message) + consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message) { local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind {} exchange to {} exchange: {}", local_exchange_name, exchange_name, message); + LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message); }); } } @@ -227,7 +234,8 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) } } - consumer_channel->bindQueue(default_local_exchange, queue_name_, binding_key) + /// Bind queue to exchange that is used for INSERT query and also for default implementation. + consumer_channel->bindQueue(local_default_exchange, queue_name_, binding_key) .onSuccess([&] { default_bindings_created = true; @@ -238,13 +246,13 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message); }); - /* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed. Also note - * that if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise consumer might fail - * to subscribe and no resubscription will help. + /* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because + * if moved there, it must(!) be wrapped inside a channel->onReady callback or any other (and the looping), otherwise + * consumer might fail to subscribe and no resubscription will help. */ subscribe(queues.back()); - LOG_TRACE(log, "Queue " + queue_name_ + " is bound by key " + binding_key); + LOG_DEBUG(log, "Queue " + queue_name_ + " is declared"); if (exchange_type_set) { @@ -253,10 +261,10 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) /* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor * exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange. */ - String hash_exchange_name = exchange_type == Exchange::HASH ? exchange_name : local_exchange_name + "_hash"; + String current_hash_exchange = exchange_type == ExchangeType::HASH ? exchange_name : local_hash_exchange; /// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary. - consumer_channel->bindQueue(hash_exchange_name, queue_name_, binding_key) + consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key) .onSuccess([&] { bindings_created = true; @@ -267,13 +275,13 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", binding_key, message); }); } - else if (exchange_type == Exchange::HEADERS) + else if (exchange_type == ExchangeType::HEADERS) { AMQP::Table binding_arguments; std::vector matching; /// It is not parsed for the second time - if it was parsed above, then it would go to the first if statement, not here. - for (auto & header : routing_keys) + for (const auto & header : routing_keys) { boost::split(matching, header, [](char c){ return c == '='; }); binding_arguments[matching[0]] = matching[1]; @@ -288,15 +296,15 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) .onError([&](const char * message) { bindings_error = true; - LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", routing_keys[0], message); + LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message); }); } else { /// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange. - for (auto & routing_key : routing_keys) + for (const auto & routing_key : routing_keys) { - /// Binding directly to exchange, specified by the client + /// Binding directly to exchange, specified by the client. consumer_channel->bindQueue(exchange_name, queue_name_, routing_key) .onSuccess([&] { @@ -305,7 +313,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) .onError([&](const char * message) { bindings_error = true; - LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", routing_key, message); + LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message); }); } } @@ -314,7 +322,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) .onError([&](const char * message) { default_bindings_error = true; - LOG_ERROR(log, "Failed to declare queue on the channel: {}", message); + LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message); }); /* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created. @@ -364,7 +372,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) * executing all callbacks on the connection (not only its own), then there should be some point to unblock. * loop_started == 1 if current consumer is started the loop and not another. */ - if (!loop_started.load() && !eventHandler.checkStopIsScheduled().load()) + if (!loop_started.load() && !eventHandler.checkStopIsScheduled()) { stopEventLoopWithTimeout(); } @@ -373,7 +381,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) .onError([&](const char * message) { consumer_error = true; - LOG_ERROR(log, "Consumer {} failed: {}", channel_id, message); + LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message); }); } @@ -385,7 +393,7 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() wait_subscribed = num_queues; - /// These variables are updated in a separate thread + /// These variables are updated in a separate thread. while (count_subscribed != wait_subscribed && !consumer_error) { startEventLoop(loop_started); @@ -393,11 +401,11 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed); - /// Updated in callbacks which are run by the loop + /// Updated in callbacks which are run by the loop. if (count_subscribed == num_queues) return; - /// A case that should never normally happen + /// A case that should never normally happen. for (auto & queue : queues) { subscribe(queue); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 6a2c847357d..d4bf35c00b8 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -32,7 +32,7 @@ public: const bool bind_by_id_, const size_t num_queues_, const String & exchange_type_, - const String & local_exchange_name_, + const String & local_exchange_, const std::atomic & stopped_); ~ReadBufferFromRabbitMQConsumer() override; @@ -53,8 +53,11 @@ private: const size_t channel_id; const bool bind_by_id; const size_t num_queues; + const String & exchange_type; - const String & local_exchange_name; + const String & local_exchange; + const String local_default_exchange; + const String local_hash_exchange; Poco::Logger * log; char row_delimiter; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 852edd24726..3de8d193302 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -107,6 +107,8 @@ StorageRabbitMQ::StorageRabbitMQ( auto table_id = getStorageID(); String table_name = table_id.table_name; + + /// Make sure that local exchange name is unique for each table and is not the same as client's exchange name local_exchange_name = exchange_name + "_" + table_name; } @@ -132,6 +134,7 @@ Pipes StorageRabbitMQ::read( } LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); + return pipes; } @@ -225,12 +228,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() ProducerBufferPtr StorageRabbitMQ::createWriteBuffer() { - /* If exchange type is set, then there are different exchanges for external publishing and for INSERT query - * as in this case they are of different types. - */ - String producer_exchange = exchange_type == "default" ? local_exchange_name : local_exchange_name + "_default"; - - return std::make_shared(parsed_address, login_password, routing_keys[0], producer_exchange, + return std::make_shared(parsed_address, login_password, routing_keys[0], local_exchange_name, log, num_consumers * num_queues, bind_by_id, use_transactional_channel, row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 09179b95a15..6d74e2c8298 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -77,7 +77,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer( WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer() { - finilize(); + finilizeProducer(); connection.close(); assert(rows == 0 && chunks.empty()); } @@ -118,7 +118,9 @@ void WriteBufferToRabbitMQProducer::countRow() ++message_counter; - /// run event loop to actually publish, checking exchange is just a point to stop the event loop + /* Run event loop to actually publish, checking exchange is just a point to stop the event loop. Messages are not sent + * without looping and looping after every batch is much better than processing all the messages in one time. + */ if ((message_counter %= Batch) == 0) { checkExchange(); @@ -132,7 +134,7 @@ void WriteBufferToRabbitMQProducer::checkExchange() std::atomic exchange_declared = false, exchange_error = false; /* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name - * and makes it visible from current producer_channel. + * and makes it declared on the current producer_channel. */ producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive) .onSuccess([&]() @@ -142,7 +144,7 @@ void WriteBufferToRabbitMQProducer::checkExchange() .onError([&](const char * message) { exchange_error = true; - LOG_ERROR(log, "Exchange was not declared: {}", message); + LOG_ERROR(log, "Exchange for INSERT query was not declared. Reason: {}", message); }); /// These variables are updated in a separate thread and starting the loop blocks current thread @@ -153,7 +155,7 @@ void WriteBufferToRabbitMQProducer::checkExchange() } -void WriteBufferToRabbitMQProducer::finilize() +void WriteBufferToRabbitMQProducer::finilizeProducer() { checkExchange(); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h index 9fd36257561..7d2bb6e598f 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h @@ -40,7 +40,7 @@ public: private: void nextImpl() override; void checkExchange(); - void finilize(); + void finilizeProducer(); std::pair & login_password; const String routing_key; @@ -56,9 +56,6 @@ private: size_t next_queue = 0; UInt64 message_counter = 0; - String channel_id; - - Messages messages; Poco::Logger * log; const std::optional delim; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index f58e898a45f..3c4c0b3215b 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -497,7 +497,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster): assert int(result) == rabbitmq_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(320) +@pytest.mark.timeout(420) def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): NUM_CHANNELS = 5 @@ -560,7 +560,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(320) +@pytest.mark.timeout(420) def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): NUM_QUEUES = 4 @@ -623,7 +623,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(320) +@pytest.mark.timeout(420) def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster): NUM_CONSUMERS = 10 @@ -688,7 +688,7 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(320) +@pytest.mark.timeout(420) def test_rabbitmq_read_only_combo(rabbitmq_cluster): NUM_MV = 5; @@ -768,7 +768,7 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): assert int(result) == messages_num * threads_num * NUM_MV, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(180) +@pytest.mark.timeout(240) def test_rabbitmq_insert(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) @@ -1054,7 +1054,10 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): key = "direct_" + str(key_num) key_num += 1 for message in messages: - channel.basic_publish(exchange='direct_exchange_testing', routing_key=key, body=message) + mes_id = str(randrange(10)) + channel.basic_publish( + exchange='direct_exchange_testing', routing_key=key, + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1066,8 +1069,8 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): for consumer_id in range(num_tables): instance.query(''' - DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; DROP TABLE IF EXISTS test.direct_exchange_{0}; + DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; '''.format(consumer_id)) instance.query(''' @@ -1122,7 +1125,10 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): key_num = 0 for message in messages: - channel.basic_publish(exchange='fanout_exchange_testing', routing_key='', body=message) + mes_id = str(randrange(10)) + channel.basic_publish( + exchange='fanout_exchange_testing', routing_key='', + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1215,7 +1221,10 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): key = "random.logs" for message in messages: - channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, body=message) + mes_id = str(randrange(10)) + channel.basic_publish( + exchange='topic_exchange_testing', routing_key=key, + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1225,18 +1234,12 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): if int(result) == messages_num * num_tables + messages_num * num_tables: break - for consumer_id in range(num_tables): + for consumer_id in range(num_tables * 2): instance.query(''' DROP TABLE IF EXISTS test.topic_exchange_{0}; DROP TABLE IF EXISTS test.topic_exchange_{0}_mv; '''.format(consumer_id)) - for consumer_id in range(num_tables): - instance.query(''' - DROP TABLE IF EXISTS test.topic_exchange_{0}; - DROP TABLE IF EXISTS test.topic_exchange_{0}_mv; - '''.format(num_tables + consumer_id)) - instance.query(''' DROP TABLE IF EXISTS test.destination; ''') @@ -1244,7 +1247,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): assert int(result) == messages_num * num_tables + messages_num * num_tables, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(320) +@pytest.mark.timeout(420) def test_rabbitmq_hash_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; @@ -1288,8 +1291,8 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 - key = str(randrange(10)) for message in messages: + key = str(randrange(10)) channel.basic_publish(exchange='hash_exchange_testing', routing_key=key, body=message) connection.close() @@ -1389,7 +1392,9 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): for key in keys: for message in messages: - channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, body=message) + mes_id = str(randrange(10)) + channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1488,8 +1493,9 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): key_num = 0 for message in messages: + mes_id = str(randrange(10)) channel.basic_publish(exchange='headers_exchange_testing', routing_key='', - properties=pika.BasicProperties(headers=fields), body=message) + properties=pika.BasicProperties(headers=fields, message_id=mes_id), body=message) connection.close() @@ -1499,16 +1505,11 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): if int(result) == messages_num * num_tables_to_receive: break - for consumer_id in range(num_tables_to_receive): + for consumer_id in range(num_tables_to_receive + num_tables_to_ignore): instance.query(''' - DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; DROP TABLE IF EXISTS test.direct_exchange_{0}; + DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; '''.format(consumer_id)) - for consumer_id in range(num_tables_to_ignore): - instance.query(''' - DROP TABLE IF EXISTS test.direct_exchange_{0}_mv; - DROP TABLE IF EXISTS test.direct_exchange_{0}; - '''.format(consumer_id + num_tables_to_receive)) instance.query(''' DROP TABLE IF EXISTS test.destination;