diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 6e8e153392c..630581b13dc 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -137,6 +137,8 @@ Block RabbitMQBlockInputStream::readImpl() virtual_columns[3]->insert(redelivered); } + last_inserted_delivery_tag = delivery_tag; + total_rows = total_rows + new_rows; buffer->allowNext(); @@ -158,4 +160,13 @@ Block RabbitMQBlockInputStream::readImpl() return result_block; } + +void RabbitMQBlockInputStream::readSuffixImpl() +{ + if (!buffer) + return; + + buffer->ackMessages(last_inserted_delivery_tag); +} + } diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index f4ab76f72cf..09cda6ff94f 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -26,6 +26,7 @@ public: void readPrefixImpl() override; Block readImpl() override; + void readSuffixImpl() override; private: StorageRabbitMQ & storage; @@ -38,6 +39,7 @@ private: const Block virtual_header; ConsumerBufferPtr buffer; + UInt64 last_inserted_delivery_tag; }; } diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.cpp b/src/Storages/RabbitMQ/RabbitMQHandler.cpp index f01b1e60eab..5d17ff23b64 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.cpp +++ b/src/Storages/RabbitMQ/RabbitMQHandler.cpp @@ -31,11 +31,9 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes void RabbitMQHandler::startLoop() { std::lock_guard lock(startup_mutex); - loop_started.store(true); /// stop_loop variable is updated in a separate thread while (!stop_loop.load()) uv_run(loop, UV_RUN_NOWAIT); - loop_started.store(false); } void RabbitMQHandler::iterateLoop() diff --git a/src/Storages/RabbitMQ/RabbitMQHandler.h b/src/Storages/RabbitMQ/RabbitMQHandler.h index b1b84e1d07a..5893ace1d2f 100644 --- a/src/Storages/RabbitMQ/RabbitMQHandler.h +++ b/src/Storages/RabbitMQ/RabbitMQHandler.h @@ -21,13 +21,12 @@ public: void stop() { stop_loop.store(true); } void startLoop(); void iterateLoop(); - bool checkLoop() const { return loop_started.load(); } private: uv_loop_t * loop; Poco::Logger * log; - std::atomic stop_loop = false, loop_started = false; + std::atomic stop_loop = false; std::mutex startup_mutex; }; diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 488fc59e562..cd7e7de9622 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -20,6 +20,7 @@ namespace DB M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ M(SettingBool, rabbitmq_transactional_channel, false, "Use transactional channel for publishing.", 0) \ M(SettingString, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \ + M(SettingString, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \ DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS) diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 5abdb4fe7c2..705aae7ec61 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -30,6 +30,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( bool hash_exchange_, size_t num_queues_, const String & local_exchange_, + const String & deadletter_exchange_, const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer_channel(std::move(consumer_channel_)) @@ -46,6 +47,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( , row_delimiter(row_delimiter_) , stopped(stopped_) , local_exchange(local_exchange_) + , deadletter_exchange(deadletter_exchange_) , received(QUEUE_SIZE * num_queues) { for (size_t queue_id = 0; queue_id < num_queues; ++queue_id) @@ -55,6 +57,12 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() { + if (ack.load() && consumer_channel) + { + consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked. + LOG_TRACE(log, "Acknowledged messages with deliveryTags up to {}", prev_tag); + } + consumer_channel->close(); received.clear(); BufferBase::set(nullptr, 0, 0); @@ -162,14 +170,20 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id) LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message); }); + AMQP::Table queue_settings; + if (!deadletter_exchange.empty()) + { + queue_settings["x-dead-letter-exchange"] = deadletter_exchange; + } + if (!queue_base.empty()) { const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id); - setup_channel->declareQueue(queue_name, AMQP::durable).onSuccess(success_callback).onError(error_callback); + setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); } else { - setup_channel->declareQueue(AMQP::durable).onSuccess(success_callback).onError(error_callback); + setup_channel->declareQueue(AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); } while (!bindings_created && !bindings_error) @@ -184,15 +198,20 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) if (subscribed_queue[queue_name]) return; - consumer_channel->consume(queue_name, AMQP::noack) + consumer_channel->consume(queue_name) .onSuccess([&](const std::string & consumer) { subscribed_queue[queue_name] = true; - consumer_error = false; ++count_subscribed; + LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name); + + consumer_error = false; consumer_tag = consumer; - LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name); + consumer_channel->onError([&](const char * message) + { + LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message); + }); }) .onReceived([&](const AMQP::Message & message, uint64_t deliveryTag, bool redelivered) { @@ -201,11 +220,16 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name) { String message_received = std::string(message.body(), message.body() + message_size); if (row_delimiter != '\0') - { message_received += row_delimiter; - } received.push({deliveryTag, message_received, redelivered}); + + std::lock_guard lock(wait_ack); + if (ack.exchange(false) && prev_tag < max_tag && consumer_channel) + { + consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked. + LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag); + } } }) .onError([&](const char * message) @@ -243,6 +267,17 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription() } +void ReadBufferFromRabbitMQConsumer::ackMessages(UInt64 last_inserted_delivery_tag) +{ + if (last_inserted_delivery_tag > prev_tag) + { + std::lock_guard lock(wait_ack); + prev_tag = last_inserted_delivery_tag; + ack.store(true); + } +} + + void ReadBufferFromRabbitMQConsumer::iterateEventLoop() { event_handler->iterateLoop(); @@ -259,6 +294,7 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl() auto * new_position = const_cast(current.message.data()); BufferBase::set(new_position, current.message.size(), 0); allowed = false; + max_tag = current.delivery_tag; return true; } diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index f4978e54229..8033f537e8c 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -36,6 +36,7 @@ public: bool hash_exchange_, size_t num_queues_, const String & local_exchange_, + const String & deadletter_exchange_, const std::atomic & stopped_); ~ReadBufferFromRabbitMQConsumer() override; @@ -49,6 +50,7 @@ public: void allowNext() { allowed = true; } // Allow to read next message. void checkSubscription(); + void ackMessages(UInt64 last_inserted_delivery_tag); auto getConsumerTag() const { return consumer_tag; } auto getDeliveryTag() const { return current.delivery_tag; } @@ -72,15 +74,19 @@ private: bool allowed = true; const std::atomic & stopped; - const String local_exchange; + const String local_exchange, deadletter_exchange; std::atomic consumer_error = false; std::atomic count_subscribed = 0, wait_subscribed; String consumer_tag; ConcurrentBoundedQueue received; + UInt64 prev_tag = 0; MessageData current; std::vector queues; std::unordered_map subscribed_queue; + std::atomic ack = false; + std::mutex wait_ack; + UInt64 max_tag = 0; bool nextImpl() override; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index db4f1c7b338..52a07026c24 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -73,7 +73,8 @@ StorageRabbitMQ::StorageRabbitMQ( size_t num_consumers_, size_t num_queues_, const bool use_transactional_channel_, - const String & queue_base_) + const String & queue_base_, + const String & deadletter_exchange_) : IStorage(table_id_) , global_context(context_.getGlobalContext()) , rabbitmq_context(Context(global_context)) @@ -85,6 +86,7 @@ StorageRabbitMQ::StorageRabbitMQ( , num_queues(num_queues_) , use_transactional_channel(use_transactional_channel_) , queue_base(queue_base_) + , deadletter_exchange(deadletter_exchange_) , log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")")) , parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672)) , login_password(std::make_pair( @@ -224,6 +226,7 @@ void StorageRabbitMQ::initExchange() void StorageRabbitMQ::bindExchange() { std::atomic binding_created = false; + size_t bound_keys = 0; /// Bridge exchange connects client's exchange with consumers' queues. if (exchange_type == AMQP::ExchangeType::headers) @@ -257,7 +260,9 @@ void StorageRabbitMQ::bindExchange() setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key) .onSuccess([&]() { - binding_created = true; + ++bound_keys; + if (bound_keys == routing_keys.size()) + binding_created = true; }) .onError([&](const char * message) { @@ -434,7 +439,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() return std::make_shared( consumer_channel, setup_channel, event_handler, consumer_exchange, exchange_type, routing_keys, next_channel_id, queue_base, log, row_delimiter, hash_exchange, num_queues, - local_exchange, stream_cancelled); + local_exchange, deadletter_exchange, stream_cancelled); } @@ -739,10 +744,22 @@ void registerStorageRabbitMQ(StorageFactory & factory) } } + String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value; + if (args_count >= 11) + { + engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context); + + const auto * ast = engine_args[9]->as(); + if (ast && ast->value.getType() == Field::Types::String) + { + deadletter_exchange = safeGet(ast->value); + } + } + return StorageRabbitMQ::create( args.table_id, args.context, args.columns, host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers, - num_queues, use_transactional_channel, queue_base); + num_queues, use_transactional_channel, queue_base, deadletter_exchange); }; factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index d43f2ba27f1..7e2d6c6b35e 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -72,7 +72,8 @@ protected: size_t num_consumers_, size_t num_queues_, const bool use_transactional_channel_, - const String & queue_base_); + const String & queue_base_, + const String & deadletter_exchange); private: Context global_context; @@ -90,6 +91,7 @@ private: size_t num_queues; const bool use_transactional_channel; const String queue_base; + const String deadletter_exchange; Poco::Logger * log; std::pair parsed_address; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 655dee7a816..3a2b6cd6be3 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -496,135 +496,9 @@ def test_rabbitmq_big_message(rabbitmq_cluster): assert int(result) == rabbitmq_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result) -@pytest.mark.timeout(420) -def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster): - - NUM_CHANNELS = 5 - - instance.query(''' - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_exchange_name = 'test_channels_sharding', - rabbitmq_num_consumers = 5, - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - DROP TABLE IF EXISTS test.view; - DROP TABLE IF EXISTS test.consumer; - CREATE TABLE test.view (key UInt64, value UInt64) - ENGINE = MergeTree - ORDER BY key; - CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq; - ''') - - time.sleep(1) - - i = [0] - messages_num = 10000 - - credentials = pika.PlainCredentials('root', 'clickhouse') - parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) - def produce(): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - messages = [] - for _ in range(messages_num): - messages.append(json.dumps({'key': i[0], 'value': i[0]})) - i[0] += 1 - key = str(randrange(1, NUM_CHANNELS)) - for message in messages: - channel.basic_publish(exchange='test_channels_sharding', routing_key=key, body=message) - connection.close() - - threads = [] - threads_num = 20 - - for _ in range(threads_num): - threads.append(threading.Thread(target=produce)) - for thread in threads: - time.sleep(random.uniform(0, 1)) - thread.start() - - while True: - result = instance.query('SELECT count() FROM test.view') - time.sleep(1) - if int(result) == messages_num * threads_num: - break - - for thread in threads: - thread.join() - - assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) - - @pytest.mark.timeout(420) def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): - NUM_QUEUES = 4 - - instance.query(''' - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) - ENGINE = RabbitMQ - SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', - rabbitmq_exchange_name = 'test_queues_sharding', - rabbitmq_num_queues = 4, - rabbitmq_format = 'JSONEachRow', - rabbitmq_row_delimiter = '\\n'; - DROP TABLE IF EXISTS test.view; - DROP TABLE IF EXISTS test.consumer; - CREATE TABLE test.view (key UInt64, value UInt64) - ENGINE = MergeTree - ORDER BY key; - CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq; - ''') - - time.sleep(1) - - i = [0] - messages_num = 10000 - - credentials = pika.PlainCredentials('root', 'clickhouse') - parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) - def produce(): - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - messages = [] - for _ in range(messages_num): - messages.append(json.dumps({'key': i[0], 'value': i[0]})) - i[0] += 1 - key = str(randrange(1, NUM_QUEUES)) - for message in messages: - channel.basic_publish(exchange='test_queues_sharding', routing_key=key, body=message) - connection.close() - - threads = [] - threads_num = 20 - - for _ in range(threads_num): - threads.append(threading.Thread(target=produce)) - for thread in threads: - time.sleep(random.uniform(0, 1)) - thread.start() - - while True: - result = instance.query('SELECT count() FROM test.view') - time.sleep(1) - if int(result) == messages_num * threads_num: - break - - for thread in threads: - thread.join() - - assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) - - -@pytest.mark.timeout(420) -def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster): - NUM_CONSUMERS = 10 NUM_QUEUES = 2 @@ -639,12 +513,12 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) rabbitmq_row_delimiter = '\\n'; DROP TABLE IF EXISTS test.view; DROP TABLE IF EXISTS test.consumer; - CREATE TABLE test.view (key UInt64, value UInt64) + CREATE TABLE test.view (key UInt64, value UInt64, consumer_tag String) ENGINE = MergeTree ORDER BY key SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3; CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT * FROM test.rabbitmq; + SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq; ''') time.sleep(1) @@ -662,9 +536,12 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) for _ in range(messages_num): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 - key = str(randrange(1, NUM_QUEUES * NUM_CONSUMERS)) + current = 0 for message in messages: - channel.basic_publish(exchange='test_sharding', routing_key=key, body=message) + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='test_sharding', routing_key='', + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() threads = [] @@ -676,16 +553,20 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster) time.sleep(random.uniform(0, 1)) thread.start() + result1 = '' while True: - result = instance.query('SELECT count() FROM test.view') + result1 = instance.query('SELECT count() FROM test.view') time.sleep(1) - if int(result) == messages_num * threads_num: + if int(result1) == messages_num * threads_num: break + result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view") + for thread in threads: thread.join() - assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + assert int(result2) == 10 @pytest.mark.timeout(420) @@ -734,8 +615,12 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 key = str(randrange(1, NUM_CONSUMERS)) + current = 0 for message in messages: - channel.basic_publish(exchange='combo', routing_key=key, body=message) + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='combo', routing_key=key, + properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() threads = [] @@ -1140,11 +1025,11 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): messages.append(json.dumps({'key': i[0], 'value': i[0]})) i[0] += 1 - key_num = 0 + current = 0 for message in messages: - mes_id = str(randrange(10)) - channel.basic_publish( - exchange='fanout_exchange_testing', routing_key='', + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='fanout_exchange_testing', routing_key='', properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1236,10 +1121,11 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, body=message) key = "random.logs" + current = 0 for message in messages: - mes_id = str(randrange(10)) - channel.basic_publish( - exchange='topic_exchange_testing', routing_key=key, + current += 1 + mes_id = str(current) + channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, properties=pika.BasicProperties(message_id=mes_id), body=message) connection.close() @@ -1411,8 +1297,10 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): keys = ['key1', 'key2', 'key3', 'key4', 'key5'] for key in keys: + current = 0 for message in messages: - mes_id = str(randrange(10)) + current += 1 + mes_id = str(current) channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, properties=pika.BasicProperties(message_id=mes_id), body=message) @@ -1510,9 +1398,10 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): fields['type']='report' fields['year']='2020' - key_num = 0 + current = 0 for message in messages: - mes_id = str(randrange(10)) + current += 1 + mes_id = str(current) channel.basic_publish(exchange='headers_exchange_testing', routing_key='', properties=pika.BasicProperties(headers=fields, message_id=mes_id), body=message) @@ -1674,7 +1563,91 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): @pytest.mark.timeout(420) -def test_rabbitmq_queue_resume(rabbitmq_cluster): +def test_rabbitmq_queue_resume_1(rabbitmq_cluster): + instance.query(''' + CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'queue_resume', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'queue_resume', + rabbitmq_num_consumers = '2', + rabbitmq_num_queues = '2', + rabbitmq_queue_base = 'queue_resume', + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; + ''') + + i = [0] + messages_num = 5000 + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + def produce(): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + messages = [] + for _ in range(messages_num): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + for message in messages: + channel.basic_publish(exchange='queue_resume', routing_key='queue_resume', body=message, + properties=pika.BasicProperties(delivery_mode = 2)) + connection.close() + + threads = [] + threads_num = 10 + for _ in range(threads_num): + threads.append(threading.Thread(target=produce)) + for thread in threads: + time.sleep(random.uniform(0, 1)) + thread.start() + + for thread in threads: + thread.join() + + instance.query(''' + DROP TABLE IF EXISTS test.rabbitmq_queue_resume; + ''') + + instance.query(''' + CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'queue_resume', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'queue_resume', + rabbitmq_num_consumers = '2', + rabbitmq_num_queues = '2', + rabbitmq_queue_base = 'queue_resume', + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.view (key UInt64, value UInt64) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT * FROM test.rabbitmq_queue_resume; + ''') + + while True: + result1 = instance.query('SELECT count() FROM test.view') + time.sleep(1) + if int(result1) == messages_num * threads_num: + break + + instance.query(''' + DROP TABLE IF EXISTS test.rabbitmq_queue_resume; + DROP TABLE IF EXISTS test.consumer; + DROP TABLE IF EXISTS test.view; + ''') + + assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + + +@pytest.mark.timeout(420) +def test_rabbitmq_queue_resume_2(rabbitmq_cluster): instance.query(''' CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64) ENGINE = RabbitMQ @@ -1699,7 +1672,6 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster): credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) - def produce(): connection = pika.BlockingConnection(parameters) channel = connection.channel() @@ -1762,6 +1734,95 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster): assert int(result2) == 2 +@pytest.mark.timeout(420) +def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster): + instance.query(''' + CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_exchange_name = 'consumer_acks', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'consumer_acks', + rabbitmq_queue_base = 'consumer_resume', + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; + ''') + + i = [0] + messages_num = 5000 + + credentials = pika.PlainCredentials('root', 'clickhouse') + parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials) + def produce(): + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + messages = [] + for _ in range(messages_num): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + for message in messages: + channel.basic_publish(exchange='consumer_acks', routing_key='consumer_acks', body=message, + properties=pika.BasicProperties(delivery_mode = 2)) + connection.close() + + threads = [] + threads_num = 20 + for _ in range(threads_num): + threads.append(threading.Thread(target=produce)) + for thread in threads: + time.sleep(random.uniform(0, 1)) + thread.start() + + for thread in threads: + thread.join() + + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.consumer; + CREATE TABLE test.view (key UInt64, value UInt64, consumer_tag String) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer TO test.view AS + SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq_consumer_acks; + ''') + + while int(instance.query('SELECT count() FROM test.view')) == 0: + time.sleep(1) + + instance.query(''' + DROP TABLE IF EXISTS test.rabbitmq_consumer_acks; + ''') + + collected = int(instance.query('SELECT count() FROM test.view')) + + instance.query(''' + CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_queue_base = 'consumer_resume', + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n'; + ''') + + while True: + result1 = instance.query('SELECT count() FROM test.view') + time.sleep(1) + #print("receiived", result1, "collected", collected) + if int(result1) == messages_num * threads_num: + break + + result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view") + + instance.query(''' + DROP TABLE IF EXISTS test.rabbitmq_consumer_acks; + DROP TABLE IF EXISTS test.consumer; + DROP TABLE IF EXISTS test.view; + ''') + + assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result) + if collected < result1: + assert int(result2) == 2 + if __name__ == '__main__': cluster.start()