diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 7b1cdd11317..6e8e153392c 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -124,7 +124,7 @@ Block RabbitMQBlockInputStream::readImpl() auto new_rows = read_rabbitmq_message(); - auto exchange_name = buffer->getExchange(); + auto exchange_name = storage.getExchange(); auto consumer_tag = buffer->getConsumerTag(); auto delivery_tag = buffer->getDeliveryTag(); auto redelivered = buffer->getRedelivered(); diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp index 2559b31c44a..87a17d3e1ed 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp @@ -33,6 +33,8 @@ Block RabbitMQBlockOutputStream::getHeader() const void RabbitMQBlockOutputStream::writePrefix() { + if (storage.checkBridge()) + storage.unbindExchange(); buffer = storage.createWriteBuffer(); if (!buffer) throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER); diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index 5d17ff23b64..f01b1e60eab 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -31,9 +31,11 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes void RabbitMQHandler::startLoop() { std::lock_guard lock(startup_mutex); + loop_started.store(true); /// stop_loop variable is updated in a separate thread while (!stop_loop.load()) uv_run(loop, UV_RUN_NOWAIT); + loop_started.store(false); } void RabbitMQHandler::iterateLoop() diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index 5893ace1d2f..b1b84e1d07a 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -21,12 +21,13 @@ public: void stop() { stop_loop.store(true); } void startLoop(); void iterateLoop(); + bool checkLoop() const { return loop_started.load(); } private: uv_loop_t * loop; Poco::Logger * log; - std::atomic stop_loop = false; + std::atomic stop_loop = false, loop_started = false; std::mutex startup_mutex; }; diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 0aff21f8a8e..8c272e04691 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -14,15 +14,11 @@ namespace DB { -namespace ExchangeType -{ - static const String HASH_SUF = "_hash"; -} - static const auto QUEUE_SIZE = 50000; /// Equals capacity of a single rabbitmq queue ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, + ChannelPtr setup_channel_, HandlerPtr event_handler_, const String & exchange_name_, const AMQP::ExchangeType & exchange_type_, @@ -36,6 +32,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer_channel(std::move(consumer_channel_)) + , setup_channel(setup_channel_) , event_handler(event_handler_) , exchange_name(exchange_name_) , exchange_type(exchange_type_) @@ -43,21 +40,14 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , channel_id(channel_id_) , hash_exchange(hash_exchange_) , num_queues(num_queues_) - , local_exchange(local_exchange_) - , local_hash_exchange(local_exchange + ExchangeType::HASH_SUF) , log(log_) , row_delimiter(row_delimiter_) , stopped(stopped_) + , local_exchange(local_exchange_) , received(QUEUE_SIZE * num_queues) { - /* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added. - * By default there is one queue per consumer. - */ for (size_t queue_id = 0; queue_id < num_queues; ++queue_id) - { - /// Queue bingings must be declared before any publishing => it must be done here and not in readPrefix() initQueueBindings(queue_id); - } } @@ -70,125 +60,34 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() } -void ReadBufferFromRabbitMQConsumer::initExchange() -{ - /* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which - * will evenly distribute messages between all consumers. - */ - consumer_channel->declareExchange(exchange_name, exchange_type).onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message); - }); - - /// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash - if (!hash_exchange || exchange_type == AMQP::ExchangeType::consistent_hash) - return; - - { - /* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But - * in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys - * of other types: headers, patterns and string-keys. This means that hash property must be changed. - */ - AMQP::Table binding_arguments; - binding_arguments["hash-property"] = "message_id"; - - /// Declare exchange for sharding. - consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments) - .onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message); - }); - } - - /// Then bind client's exchange to sharding exchange (by keys, specified by the client): - - if (exchange_type == AMQP::ExchangeType::headers) - { - AMQP::Table binding_arguments; - std::vector matching; - - for (const auto & header : routing_keys) - { - boost::split(matching, header, [](char c){ return c == '='; }); - binding_arguments[matching[0]] = matching[1]; - matching.clear(); - } - - /// Routing key can be arbitrary here. - consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments) - .onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message); - }); - } - else if (exchange_type == AMQP::ExchangeType::fanout) - { - consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0]).onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message); - }); - } - else - { - for (const auto & routing_key : routing_keys) - { - consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message) - { - local_exchange_declared = false; - LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message); - }); - } - } -} - - void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) { - /// These variables might be updated later from a separate thread in onError callbacks. - if (!local_exchange_declared || (hash_exchange && !local_hash_exchange_declared)) - { - initExchange(); - local_exchange_declared = true; - local_hash_exchange_declared = true; - } - bool bindings_created = false, bindings_error = false; - consumer_channel->declareQueue(AMQP::exclusive) + setup_channel->declareQueue(AMQP::exclusive) .onSuccess([&](const std::string & queue_name_, int /* msgcount */, int /* consumercount */) { queues.emplace_back(queue_name_); LOG_DEBUG(log, "Queue " + queue_name_ + " is declared"); subscribed_queue[queue_name_] = false; - /* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because - * if moved there, it must(!) be wrapped inside a channel->onSuccess callback or any other, otherwise - * consumer might fail to subscribe and no resubscription will help. - */ subscribe(queues.back()); if (hash_exchange) { String binding_key; if (queues.size() == 1) - { binding_key = std::to_string(channel_id); - } else - { binding_key = std::to_string(channel_id + queue_id); - } + /* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor * exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange. */ - String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_hash_exchange; + String current_hash_exchange = exchange_type == AMQP::ExchangeType::consistent_hash ? exchange_name : local_exchange; /// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary. - consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key) + setup_channel->bindQueue(current_hash_exchange, queue_name_, binding_key) .onSuccess([&] { bindings_created = true; @@ -201,7 +100,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) } else if (exchange_type == AMQP::ExchangeType::fanout) { - consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0]) + setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0]) .onSuccess([&] { bindings_created = true; @@ -225,7 +124,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) matching.clear(); } - consumer_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments) + setup_channel->bindQueue(exchange_name, queue_name_, routing_keys[0], binding_arguments) .onSuccess([&] { bindings_created = true; @@ -242,7 +141,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) for (const auto & routing_key : routing_keys) { /// Binding directly to exchange, specified by the client. - consumer_channel->bindQueue(exchange_name, queue_name_, routing_key) + setup_channel->bindQueue(exchange_name, queue_name_, routing_key) .onSuccess([&] { bindings_created = true; @@ -261,10 +160,6 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message); }); - /* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created. - * It is important at this moment to make sure that queue bindings are created before any publishing can happen because - * otherwise messages will be routed nowhere. - */ while (!bindings_created && !bindings_error) { iterateEventLoop(); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 85644562d0c..6896dd7f4b0 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -24,6 +24,7 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer public: ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, + ChannelPtr setup_channel_, HandlerPtr event_handler_, const String & exchange_name_, const AMQP::ExchangeType & exchange_type_, @@ -48,13 +49,13 @@ public: void allowNext() { allowed = true; } // Allow to read next message. void checkSubscription(); - auto getExchange() const { return exchange_name; } auto getConsumerTag() const { return consumer_tag; } auto getDeliveryTag() const { return current.delivery_tag; } auto getRedelivered() const { return current.redelivered; } private: ChannelPtr consumer_channel; + ChannelPtr setup_channel; HandlerPtr event_handler; const String exchange_name; @@ -64,18 +65,12 @@ private: const bool hash_exchange; const size_t num_queues; - const String local_exchange; - const String local_default_exchange; - const String local_hash_exchange; - Poco::Logger * log; char row_delimiter; bool allowed = true; const std::atomic & stopped; - String default_local_exchange; - bool local_exchange_declared = false, local_hash_exchange_declared = false; - + const String local_exchange; std::atomic consumer_error = false; std::atomic count_subscribed = 0, wait_subscribed; @@ -87,7 +82,7 @@ private: bool nextImpl() override; - void initExchange(); + void connectAlternateExchange(); void initQueueBindings(const size_t queue_id); void subscribe(const String & queue_name); void iterateEventLoop(); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 66af7dc3f56..d56a46c4f55 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -118,8 +118,7 @@ StorageRabbitMQ::StorageRabbitMQ( hash_exchange = num_consumers > 1 || num_queues > 1; - exchange_type_set = exchange_type_ != ExchangeType::DEFAULT; - if (exchange_type_set) + if (exchange_type_ != ExchangeType::DEFAULT) { if (exchange_type_ == ExchangeType::FANOUT) exchange_type = AMQP::ExchangeType::fanout; else if (exchange_type_ == ExchangeType::DIRECT) exchange_type = AMQP::ExchangeType::direct; @@ -133,11 +132,23 @@ StorageRabbitMQ::StorageRabbitMQ( exchange_type = AMQP::ExchangeType::fanout; } + if (exchange_type == AMQP::ExchangeType::headers) + { + std::vector matching; + for (const auto & header : routing_keys) + { + boost::split(matching, header, [](char c){ return c == '='; }); + bind_headers[matching[0]] = matching[1]; + matching.clear(); + } + } + auto table_id = getStorageID(); String table_name = table_id.table_name; /// Make sure that local exchange name is unique for each table and is not the same as client's exchange name - local_exchange_name = exchange_name + "_" + table_name; + local_exchange = exchange_name + "_" + table_name; + bridge_exchange = local_exchange + "_bridge"; /// One looping task for all consumers as they share the same connection == the same handler == the same event loop looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); @@ -163,6 +174,133 @@ void StorageRabbitMQ::loopingFunc() } +void StorageRabbitMQ::initExchange() +{ + /* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which + * will evenly distribute messages between all consumers. + */ + setup_channel->declareExchange(exchange_name, exchange_type, AMQP::durable) + .onError([&](const char * message) + { + throw Exception("Unable to declare exchange. Make sure specified exchange is not already declared. Error: " + + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + + /// Bridge exchange is needed to easily disconnect consumer queues. + setup_channel->declareExchange(bridge_exchange, AMQP::fanout, AMQP::durable + AMQP::autodelete) + .onError([&](const char * message) + { + throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + + if (!hash_exchange) + { + consumer_exchange = bridge_exchange; + return; + } + + /// Declare exchange for sharding. + AMQP::Table binding_arguments; + binding_arguments["hash-property"] = "message_id"; + + setup_channel->declareExchange(local_exchange, AMQP::consistent_hash, AMQP::durable + AMQP::autodelete, binding_arguments) + .onError([&](const char * message) + { + throw Exception("Unable to declare exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + + setup_channel->bindExchange(bridge_exchange, local_exchange, routing_keys[0]) + .onError([&](const char * message) + { + throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + + consumer_exchange = local_exchange; +} + + +void StorageRabbitMQ::bindExchange() +{ + std::atomic binding_created = false; + + /// Bridge exchange connects client's exchange with consumers' queues. + if (exchange_type == AMQP::ExchangeType::headers) + { + setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0], bind_headers) + .onSuccess([&]() + { + binding_created = true; + }) + .onError([&](const char * message) + { + throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + } + else if (exchange_type == AMQP::ExchangeType::fanout || exchange_type == AMQP::ExchangeType::consistent_hash) + { + setup_channel->bindExchange(exchange_name, bridge_exchange, routing_keys[0]) + .onSuccess([&]() + { + binding_created = true; + }) + .onError([&](const char * message) + { + throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + } + else + { + for (const auto & routing_key : routing_keys) + { + setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key) + .onSuccess([&]() + { + binding_created = true; + }) + .onError([&](const char * message) + { + throw Exception("Unable to bind exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + } + } + + while (!binding_created) + { + event_handler->iterateLoop(); + } +} + + +void StorageRabbitMQ::unbindExchange() +{ + if (bridge.try_lock()) + { + if (exchange_removed.load()) + return; + + setup_channel->removeExchange(bridge_exchange) + .onSuccess([&]() + { + exchange_removed.store(true); + }) + .onError([&](const char * message) + { + throw Exception("Unable to remove exchange. Reason: " + std::string(message), ErrorCodes::CANNOT_CONNECT_RABBITMQ); + }); + + while (!exchange_removed) + { + event_handler->iterateLoop(); + } + + event_handler->stop(); + looping_task->deactivate(); + + bridge.unlock(); + } +} + + Pipes StorageRabbitMQ::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -207,6 +345,10 @@ BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadat void StorageRabbitMQ::startup() { + setup_channel = std::make_shared(connection.get()); + initExchange(); + bindExchange(); + for (size_t i = 0; i < num_consumers; ++i) { try @@ -288,9 +430,9 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() ChannelPtr consumer_channel = std::make_shared(connection.get()); return std::make_shared( - consumer_channel, event_handler, exchange_name, exchange_type, routing_keys, + consumer_channel, setup_channel, event_handler, consumer_exchange, exchange_type, routing_keys, next_channel_id, log, row_delimiter, hash_exchange, num_queues, - local_exchange_name, stream_cancelled); + local_exchange, stream_cancelled); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 4457c5ff8c9..07b24e8ca1d 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -54,6 +54,9 @@ public: const String & getFormatName() const { return format_name; } NamesAndTypesList getVirtuals() const override; + const String getExchange() const { return exchange_name; } + bool checkBridge() const { return !exchange_removed.load(); } + void unbindExchange(); protected: StorageRabbitMQ( @@ -77,7 +80,6 @@ private: Names routing_keys; const String exchange_name; AMQP::ExchangeType exchange_type; - String local_exchange_name; const String format_name; char row_delimiter; @@ -99,10 +101,13 @@ private: std::mutex mutex; std::vector buffers; /// available buffers for RabbitMQ consumers - bool exchange_type_set = false; + String local_exchange, bridge_exchange, consumer_exchange; + std::mutex bridge; + AMQP::Table bind_headers; size_t next_channel_id = 1; /// Must >= 1 because it is used as a binding key, which has to be > 0 bool update_channel_id = false; - std::atomic loop_started = false; + std::atomic loop_started = false, exchange_removed = false; + ChannelPtr setup_channel; BackgroundSchedulePool::TaskHolder streaming_task; BackgroundSchedulePool::TaskHolder heartbeat_task; @@ -115,6 +120,8 @@ private: void threadFunc(); void heartbeatFunc(); void loopingFunc(); + void initExchange(); + void bindExchange(); void pingConnection() { connection->heartbeat(); } bool streamToViews(); diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 11b13714448..27e4a7b8a03 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -171,15 +171,14 @@ void WriteBufferToRabbitMQProducer::initExchange() { std::atomic exchange_declared = false, exchange_error = false; - producer_channel->declareExchange(exchange_name, exchange_type) + producer_channel->declareExchange(exchange_name, exchange_type, AMQP::durable + AMQP::passive) .onSuccess([&]() { exchange_declared = true; }) - .onError([&](const char * message) + .onError([&](const char * /* message */) { exchange_error = true; - LOG_ERROR(log, "Exchange error: {}", message); }); /// These variables are updated in a separate thread. diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index eaaa8613b5f..104ffa4e5cb 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -121,7 +121,7 @@ def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'new', - rabbitmq_exchange_name = 'clickhouse-exchange', + rabbitmq_exchange_name = 'new', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') @@ -130,20 +130,19 @@ def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(25): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='new', body=message) + channel.basic_publish(exchange='new', routing_key='new', body=message) messages = [] for i in range(25, 50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='new', body=message) + channel.basic_publish(exchange='new', routing_key='new', body=message) connection.close() @@ -160,21 +159,20 @@ def test_rabbitmq_select_from_new_syntax_table(rabbitmq_cluster): def test_rabbitmq_select_from_old_syntax_table(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) - ENGINE = RabbitMQ('rabbitmq1:5672', 'old', 'clickhouse-exchange', 'JSONEachRow', '\\n'); + ENGINE = RabbitMQ('rabbitmq1:5672', 'old', 'old', 'JSONEachRow', '\\n'); ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='old', body=message) + channel.basic_publish(exchange='old', routing_key='old', body=message) connection.close() @@ -208,7 +206,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'json', - rabbitmq_exchange_name = 'clickhouse-exchange', + rabbitmq_exchange_name = 'delim1', rabbitmq_format = 'JSONEachRow' ''') @@ -216,7 +214,6 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = '' for i in range(25): @@ -224,14 +221,14 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): all_messages = [messages] for message in all_messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='json', body=message) + channel.basic_publish(exchange='delim1', routing_key='json', body=message) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' all_messages = [messages] for message in all_messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='json', body=message) + channel.basic_publish(exchange='delim1', routing_key='json', body=message) result = '' while True: @@ -250,7 +247,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'csv', - rabbitmq_exchange_name = 'clickhouse-exchange', + rabbitmq_exchange_name = 'delim2', rabbitmq_format = 'CSV', rabbitmq_row_delimiter = '\\n'; ''') @@ -259,14 +256,13 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append('{i}, {i}'.format(i=i)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='csv', body=message) + channel.basic_publish(exchange='delim2', routing_key='csv', body=message) result = '' while True: @@ -286,7 +282,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_routing_key_list = 'tsv', - rabbitmq_exchange_name = 'clickhouse-exchange', + rabbitmq_exchange_name = 'delim3', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; ''') @@ -295,14 +291,13 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for i in range(50): messages.append('{i}\t{i}'.format(i=i)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='tsv', body=message) + channel.basic_publish(exchange='delim3', routing_key='tsv', body=message) result = '' while True: @@ -322,6 +317,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'mv', rabbitmq_routing_key_list = 'mv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -341,7 +337,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='mv', body=message) + channel.basic_publish(exchange='mv', routing_key='mv', body=message) while True: result = instance.query('SELECT * FROM test.view') @@ -365,6 +361,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'mvsq', rabbitmq_routing_key_list = 'mvsq', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -384,7 +381,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='mvsq', body=message) + channel.basic_publish(exchange='mvsq', routing_key='mvsq', body=message) while True: result = instance.query('SELECT * FROM test.view') @@ -410,6 +407,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'mmv', rabbitmq_routing_key_list = 'mmv', rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -434,7 +432,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='mmv', body=message) + channel.basic_publish(exchange='mmv', routing_key='mmv', body=message) while True: result1 = instance.query('SELECT * FROM test.view1') @@ -471,6 +469,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'big', rabbitmq_routing_key_list = 'big', rabbitmq_format = 'JSONEachRow'; CREATE TABLE test.view (key UInt64, value String) @@ -481,7 +480,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster): ''') for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key='big', body=message) + channel.basic_publish(exchange='big', routing_key='big', body=message) while True: result = instance.query('SELECT count() FROM test.view') @@ -506,6 +505,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'test_channels_sharding', rabbitmq_num_consumers = 5, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -528,7 +528,6 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): @@ -536,7 +535,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): i[0] += 1 key = str(randrange(1, NUM_CHANNELS)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) + channel.basic_publish(exchange='test_channels_sharding', routing_key=key, body=message) connection.close() threads = [] @@ -569,6 +568,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'test_queues_sharding', rabbitmq_num_queues = 4, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -591,7 +591,6 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): @@ -599,7 +598,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): i[0] += 1 key = str(randrange(1, NUM_QUEUES)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) + channel.basic_publish(exchange='test_queues_sharding', routing_key=key, body=message) connection.close() threads = [] @@ -633,6 +632,7 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'test_sharding', rabbitmq_num_queues = 2, rabbitmq_num_consumers = 10, rabbitmq_format = 'JSONEachRow', @@ -657,7 +657,6 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): @@ -665,7 +664,7 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) i[0] += 1 key = str(randrange(1, NUM_QUEUES * NUM_CONSUMERS)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) + channel.basic_publish(exchange='test_sharding', routing_key=key, body=message) connection.close() threads = [] @@ -699,6 +698,7 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'combo', rabbitmq_num_consumers = 4, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -728,7 +728,6 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='clickhouse-exchange', exchange_type='fanout') messages = [] for _ in range(messages_num): @@ -736,7 +735,7 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): i[0] += 1 key = str(randrange(1, NUM_CONSUMERS)) for message in messages: - channel.basic_publish(exchange='clickhouse-exchange', routing_key=key, body=message) + channel.basic_publish(exchange='combo', routing_key=key, body=message) connection.close() threads = [] @@ -787,7 +786,6 @@ def test_rabbitmq_insert(rabbitmq_cluster): consumer_connection = pika.BlockingConnection(parameters) consumer = consumer_connection.channel() - consumer.exchange_declare(exchange='insert', exchange_type='direct') result = consumer.queue_declare(queue='') queue_name = result.method.queue consumer.queue_bind(exchange='insert', queue=queue_name, routing_key='insert1') @@ -840,7 +838,6 @@ def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster): consumer_connection = pika.BlockingConnection(parameters) consumer = consumer_connection.channel() - consumer.exchange_declare(exchange='insert_headers', exchange_type='headers') result = consumer.queue_declare(queue='') queue_name = result.method.queue consumer.queue_bind(exchange='insert_headers', queue=queue_name, routing_key="", @@ -890,12 +887,20 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): rabbitmq_routing_key_list = 'insert2', rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; + CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'many_inserts', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'insert2', + rabbitmq_format = 'TSV', + rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_many (key UInt64, value UInt64) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS - SELECT * FROM test.rabbitmq_many; + SELECT * FROM test.rabbitmq_consume; ''') messages_num = 1000 @@ -933,6 +938,7 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): DROP TABLE IF EXISTS test.rabbitmq_many; DROP TABLE IF EXISTS test.consumer_many; DROP TABLE IF EXISTS test.view_many; + DROP TABLE IF EXISTS test.view_consume; ''') for thread in threads: @@ -946,13 +952,21 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view_overload; DROP TABLE IF EXISTS test.consumer_overload; + CREATE TABLE test.rabbitmq_consume (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'over', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'over', + rabbitmq_num_consumers = 6, + rabbitmq_format = 'TSV', + rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.rabbitmq_overload (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'over', rabbitmq_exchange_type = 'direct', rabbitmq_routing_key_list = 'over', - rabbitmq_num_consumers = 10, rabbitmq_format = 'TSV', rabbitmq_row_delimiter = '\\n'; CREATE TABLE test.view_overload (key UInt64, value UInt64) @@ -960,7 +974,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer_overload TO test.view_overload AS - SELECT * FROM test.rabbitmq_overload; + SELECT * FROM test.rabbitmq_consume; ''') messages_num = 100000 @@ -999,6 +1013,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): DROP TABLE IF EXISTS test.rabbitmq_overload; DROP TABLE IF EXISTS test.consumer_overload; DROP TABLE IF EXISTS test.view_overload; + DROP TABLE IF EXISTS test.view_consume; ''') for thread in threads: @@ -1044,7 +1059,6 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='direct_exchange_testing', exchange_type='direct') messages = [] for _ in range(messages_num): @@ -1118,7 +1132,6 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='fanout_exchange_testing', exchange_type='fanout') messages = [] for _ in range(messages_num): @@ -1207,7 +1220,6 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='topic_exchange_testing', exchange_type='topic') messages = [] for _ in range(messages_num): @@ -1253,8 +1265,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): def test_rabbitmq_hash_exchange(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.destination; - CREATE TABLE test.destination(key UInt64, value UInt64, - _consumed_by LowCardinality(String)) + CREATE TABLE test.destination(key UInt64, value UInt64, consumer_tag String) ENGINE = MergeTree() ORDER BY key; ''') @@ -1275,7 +1286,7 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; CREATE MATERIALIZED VIEW test.{0}_mv TO test.destination AS - SELECT key, value, '{0}' as _consumed_by FROM test.{0}; + SELECT key, value, _consumer_tag AS consumer_tag FROM test.{0}; '''.format(table_name)) i = [0] @@ -1288,14 +1299,16 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): # init connection here because otherwise python rabbitmq client might fail connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='hash_exchange_testing', exchange_type='x-consistent-hash') messages = [] for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 + current = 0 for message in messages: - key = str(randrange(10)) - channel.basic_publish(exchange='hash_exchange_testing', routing_key=key, body=message) + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='hash_exchange_testing', routing_key=mes_id, + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() threads = [] @@ -1307,11 +1320,13 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): time.sleep(random.uniform(0, 1)) thread.start() + result1 = '' while True: - result = instance.query('SELECT count() FROM test.destination') + result1 = instance.query('SELECT count() FROM test.destination') time.sleep(1) - if int(result) == messages_num * threads_num: + if int(result1) == messages_num * threads_num: break + result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.destination") for consumer_id in range(num_tables): table_name = 'rabbitmq_consumer{}'.format(consumer_id) @@ -1327,7 +1342,9 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): for thread in threads: thread.join() - assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + assert int(result2) >= 30 + @pytest.mark.timeout(420) @@ -1383,7 +1400,6 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): # init connection here because otherwise python rabbitmq client might fail connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='multiple_bindings_testing', exchange_type='direct') messages = [] for _ in range(messages_num): @@ -1481,7 +1497,6 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='headers_exchange_testing', exchange_type='headers') messages = [] for _ in range(messages_num): @@ -1524,20 +1539,19 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): def test_rabbitmq_virtual_columns(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + CREATE TABLE test.rabbitmq_virtuals (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'virtuals', rabbitmq_format = 'JSONEachRow'; CREATE MATERIALIZED VIEW test.view Engine=Log AS - SELECT value, key, _exchange_name, _consumer_tag, _delivery_tag, _redelivered FROM test.rabbitmq; + SELECT value, key, _exchange_name, _consumer_tag, _delivery_tag, _redelivered FROM test.rabbitmq_virtuals; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='virtuals', exchange_type='fanout') message_num = 10 i = [0] @@ -1581,6 +1595,9 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster): 8 8 virtuals amq.ctag 9 0 9 9 virtuals amq.ctag 10 0 ''' + instance.query(''' + DROP TABLE IF EXISTS test.rabbitmq_virtuals_mv + ''') assert TSV(result) == TSV(expected) @@ -1589,7 +1606,7 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): instance.query(''' DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + CREATE TABLE test.rabbitmq_virtuals_mv (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'virtuals_mv', @@ -1599,14 +1616,13 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT *, _exchange_name as exchange_name, _consumer_tag as consumer_tag, _delivery_tag as delivery_tag, _redelivered as redelivered - FROM test.rabbitmq; + FROM test.rabbitmq_virtuals_mv; ''') credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.exchange_declare(exchange='virtuals_mv', exchange_type='fanout') message_num = 10 i = [0] @@ -1647,8 +1663,9 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): ''' instance.query(''' - DROP TABLE test.consumer; - DROP TABLE test.view; + DROP TABLE IF EXISTS test.consumer; + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.rabbitmq_virtuals_mv ''') assert TSV(result) == TSV(expected)