Better exchanges, fix build, better comments, better tests

This commit is contained in:
kssenii 2020-06-14 16:26:37 +00:00
parent dcd7b7351c
commit 9e1b8b2872
8 changed files with 122 additions and 118 deletions

View File

@ -142,8 +142,6 @@ Block RabbitMQBlockInputStream::readImpl()
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
LOG_DEBUG(log, "Total amount of rows is " + std::to_string(result_block.rows()));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
{
result_block.insert(column);

View File

@ -41,8 +41,9 @@ void RabbitMQHandler::startConsumerLoop(std::atomic<bool> & loop_started)
*/
if (mutex_before_event_loop.try_lock_for(std::chrono::milliseconds(Lock_timeout)))
{
loop_started = true;
stop_scheduled.store(false);
loop_started.store(true);
stop_scheduled = false;
event_base_loop(evbase, EVLOOP_NONBLOCK);
mutex_before_event_loop.unlock();
}
@ -67,12 +68,8 @@ void RabbitMQHandler::stop()
void RabbitMQHandler::stopWithTimeout()
{
if (mutex_before_loop_stop.try_lock())
{
stop_scheduled.store(true);
stop_scheduled = true;
event_base_loopexit(evbase, &tv);
mutex_before_loop_stop.unlock();
}
}
}

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace Exchange
namespace ExchangeType
{
/// Note that default here means default by implementation and not by rabbitmq settings
static const String DEFAULT = "default";
@ -42,7 +42,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
const bool bind_by_id_,
const size_t num_queues_,
const String & exchange_type_,
const String & local_exchange_name_,
const String & local_exchange_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
@ -55,13 +55,15 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, bind_by_id(bind_by_id_)
, num_queues(num_queues_)
, exchange_type(exchange_type_)
, local_exchange_name(local_exchange_name_)
, local_exchange(local_exchange_)
, local_default_exchange(local_exchange + "_" + ExchangeType::DIRECT)
, local_hash_exchange(local_exchange + "_" + ExchangeType::HASH)
, stopped(stopped_)
{
messages.clear();
current = messages.begin();
exchange_type_set = exchange_type != Exchange::DEFAULT;
exchange_type_set = exchange_type != ExchangeType::DEFAULT;
/* One queue per consumer can handle up to 50000 messages. More queues per consumer can be added.
* By default there is one queue per consumer.
@ -87,53 +89,52 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
void ReadBufferFromRabbitMQConsumer::initExchange()
{
/* This direct-exchange is used for default implemenation and for INSERT query (so it is always declared). If exchange_type
* is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table.
* is not set, then there are only two exchanges - external, defined by the client, and local, unique for each table (default).
* This strict division to external and local exchanges is needed to avoid too much complexity with defining exchange_name
* for INSERT query producer and, in general, it is much better to distinguish them into separate ones.
* for INSERT query producer and, in general, it is better to distinguish them into separate ones.
*/
String default_exchange = exchange_type_set ? exchange_name + "_" + Exchange::DEFAULT : exchange_name;
consumer_channel->declareExchange(default_exchange, AMQP::fanout).onError([&](const char * message)
consumer_channel->declareExchange(local_default_exchange, AMQP::direct).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare exchange {}. Reason: {}", default_exchange, message);
});
default_local_exchange = local_exchange_name;
default_local_exchange += exchange_type_set ? "_default_" + Exchange::DIRECT : "_" + Exchange::DIRECT;
consumer_channel->declareExchange(default_local_exchange, AMQP::direct).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare exchange {}. Reason: {}", default_local_exchange, message);
});
/// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange.
consumer_channel->bindExchange(default_exchange, default_local_exchange, routing_keys[0]).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind {} exchange to {} exchange. Reason: {}", default_exchange, default_local_exchange, message);
LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message);
});
if (!exchange_type_set)
{
consumer_channel->declareExchange(exchange_name, AMQP::fanout).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare default fanout-exchange. Reason: {}", message);
});
/// With fanout exchange the binding key is ignored - a parameter might be arbitrary. All distribution lies on local_exchange.
consumer_channel->bindExchange(exchange_name, local_default_exchange, routing_keys[0]).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind local direct-exchange to fanout-exchange. Reason: {}", message);
});
return;
}
/// For special purposes to use the flexibility of routing provided by rabbitmq - choosing exchange types is supported.
AMQP::ExchangeType type;
if (exchange_type == Exchange::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type == Exchange::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type == Exchange::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type == Exchange::HASH) type = AMQP::ExchangeType::consistent_hash;
else if (exchange_type == Exchange::HEADERS) type = AMQP::ExchangeType::headers;
if (exchange_type == ExchangeType::FANOUT) type = AMQP::ExchangeType::fanout;
else if (exchange_type == ExchangeType::DIRECT) type = AMQP::ExchangeType::direct;
else if (exchange_type == ExchangeType::TOPIC) type = AMQP::ExchangeType::topic;
else if (exchange_type == ExchangeType::HASH) type = AMQP::ExchangeType::consistent_hash;
else if (exchange_type == ExchangeType::HEADERS) type = AMQP::ExchangeType::headers;
else throw Exception("Invalid exchange type", ErrorCodes::BAD_ARGUMENTS);
/* Declare exchange of the specified type and bind it to hash-exchange, which will evenly distribute messages
* between all consumers. (This enables better scaling as without hash-exchange - the only option to avoid getting
* the same messages more than once - is having only one consumer with one queue, which is not good.)
/* Declare client's exchange of the specified type and bind it to hash-exchange (if it is not already hash-exchange), which
* will evenly distribute messages between all consumers. (This enables better scaling as without hash-exchange - the only
* option to avoid getting the same messages more than once - is having only one consumer with one queue, which is not good.)
*/
consumer_channel->declareExchange(exchange_name, type).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare client's {} exchange: {}", exchange_type, message);
LOG_ERROR(log, "Failed to declare client's {} exchange. Reason: {}", exchange_type, message);
});
/// No need for declaring hash-exchange if there is only one consumer with one queue or exchange type is already hash
@ -142,26 +143,32 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
hash_exchange = true;
if (exchange_type == Exchange::HASH)
if (exchange_type == ExchangeType::HASH)
return;
AMQP::Table exchange_arguments;
exchange_arguments["hash-property"] = "message_id";
/* By default hash exchange distributes messages based on a hash value of a routing key, which must be a string integer. But
* in current case we use hash exchange for binding to another exchange of some other type, which needs its own routing keys
* of other types: headers, patterns and string-keys. This means that hash property must be changed.
*/
AMQP::Table binding_arguments;
binding_arguments["hash-property"] = "message_id";
String local_hash_exchange_name = local_exchange_name + "_hash";
consumer_channel->declareExchange(local_hash_exchange_name, AMQP::consistent_hash, exchange_arguments)
/// Declare exchange for sharding.
consumer_channel->declareExchange(local_hash_exchange, AMQP::consistent_hash, binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare {} exchange: {}", exchange_type, message);
});
if (exchange_type == Exchange::HEADERS)
/// Then bind client's exchange to sharding exchange (by keys, specified by the client):
if (exchange_type == ExchangeType::HEADERS)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
for (auto & header : routing_keys)
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
@ -169,21 +176,21 @@ void ReadBufferFromRabbitMQConsumer::initExchange()
}
/// Routing key can be arbitrary here.
consumer_channel->bindExchange(exchange_name, local_hash_exchange_name, routing_keys[0], binding_arguments)
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_keys[0], binding_arguments)
.onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind {} exchange to {} exchange: {}", local_exchange_name, exchange_name, message);
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
else
{
for (auto & routing_key : routing_keys)
for (const auto & routing_key : routing_keys)
{
consumer_channel->bindExchange(exchange_name, local_hash_exchange_name, routing_key).onError([&](const char * message)
consumer_channel->bindExchange(exchange_name, local_hash_exchange, routing_key).onError([&](const char * message)
{
local_exchange_declared = false;
LOG_ERROR(log, "Failed to bind {} exchange to {} exchange: {}", local_exchange_name, exchange_name, message);
LOG_ERROR(log, "Failed to bind local hash exchange to client's exchange. Reason: {}", message);
});
}
}
@ -227,7 +234,8 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
}
}
consumer_channel->bindQueue(default_local_exchange, queue_name_, binding_key)
/// Bind queue to exchange that is used for INSERT query and also for default implementation.
consumer_channel->bindQueue(local_default_exchange, queue_name_, binding_key)
.onSuccess([&]
{
default_bindings_created = true;
@ -238,13 +246,13 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to bind to key {}. Reason: {}", binding_key, message);
});
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed. Also note
* that if moved there, it must(!) be wrapped inside a channel->onReady callback or any other, otherwise consumer might fail
* to subscribe and no resubscription will help.
/* Subscription can probably be moved back to readPrefix(), but not sure whether it is better in regard to speed, because
* if moved there, it must(!) be wrapped inside a channel->onReady callback or any other (and the looping), otherwise
* consumer might fail to subscribe and no resubscription will help.
*/
subscribe(queues.back());
LOG_TRACE(log, "Queue " + queue_name_ + " is bound by key " + binding_key);
LOG_DEBUG(log, "Queue " + queue_name_ + " is declared");
if (exchange_type_set)
{
@ -253,10 +261,10 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
/* If exchange_type == hash, then bind directly to this client's exchange (because there is no need for a distributor
* exchange as it is already hash-exchange), otherwise hash-exchange is a local distributor exchange.
*/
String hash_exchange_name = exchange_type == Exchange::HASH ? exchange_name : local_exchange_name + "_hash";
String current_hash_exchange = exchange_type == ExchangeType::HASH ? exchange_name : local_hash_exchange;
/// If hash-exchange is used for messages distribution, then the binding key is ignored - can be arbitrary.
consumer_channel->bindQueue(hash_exchange_name, queue_name_, binding_key)
consumer_channel->bindQueue(current_hash_exchange, queue_name_, binding_key)
.onSuccess([&]
{
bindings_created = true;
@ -267,13 +275,13 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", binding_key, message);
});
}
else if (exchange_type == Exchange::HEADERS)
else if (exchange_type == ExchangeType::HEADERS)
{
AMQP::Table binding_arguments;
std::vector<String> matching;
/// It is not parsed for the second time - if it was parsed above, then it would go to the first if statement, not here.
for (auto & header : routing_keys)
for (const auto & header : routing_keys)
{
boost::split(matching, header, [](char c){ return c == '='; });
binding_arguments[matching[0]] = matching[1];
@ -288,15 +296,15 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", routing_keys[0], message);
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
});
}
else
{
/// Means there is only one queue with one consumer - no even distribution needed - no hash-exchange.
for (auto & routing_key : routing_keys)
for (const auto & routing_key : routing_keys)
{
/// Binding directly to exchange, specified by the client
/// Binding directly to exchange, specified by the client.
consumer_channel->bindQueue(exchange_name, queue_name_, routing_key)
.onSuccess([&]
{
@ -305,7 +313,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
bindings_error = true;
LOG_ERROR(log, "Failed to create queue binding to key {}. Reason: {}", routing_key, message);
LOG_ERROR(log, "Failed to bind queue to key. Reason: {}", message);
});
}
}
@ -314,7 +322,7 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
.onError([&](const char * message)
{
default_bindings_error = true;
LOG_ERROR(log, "Failed to declare queue on the channel: {}", message);
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
});
/* Run event loop (which updates local variables in a separate thread) until bindings are created or failed to be created.
@ -364,7 +372,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
* executing all callbacks on the connection (not only its own), then there should be some point to unblock.
* loop_started == 1 if current consumer is started the loop and not another.
*/
if (!loop_started.load() && !eventHandler.checkStopIsScheduled().load())
if (!loop_started.load() && !eventHandler.checkStopIsScheduled())
{
stopEventLoopWithTimeout();
}
@ -373,7 +381,7 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
.onError([&](const char * message)
{
consumer_error = true;
LOG_ERROR(log, "Consumer {} failed: {}", channel_id, message);
LOG_ERROR(log, "Consumer {} failed. Reason: {}", channel_id, message);
});
}
@ -385,7 +393,7 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
wait_subscribed = num_queues;
/// These variables are updated in a separate thread
/// These variables are updated in a separate thread.
while (count_subscribed != wait_subscribed && !consumer_error)
{
startEventLoop(loop_started);
@ -393,11 +401,11 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
LOG_TRACE(log, "Consumer {} is subscribed to {} queues", channel_id, count_subscribed);
/// Updated in callbacks which are run by the loop
/// Updated in callbacks which are run by the loop.
if (count_subscribed == num_queues)
return;
/// A case that should never normally happen
/// A case that should never normally happen.
for (auto & queue : queues)
{
subscribe(queue);

View File

@ -32,7 +32,7 @@ public:
const bool bind_by_id_,
const size_t num_queues_,
const String & exchange_type_,
const String & local_exchange_name_,
const String & local_exchange_,
const std::atomic<bool> & stopped_);
~ReadBufferFromRabbitMQConsumer() override;
@ -53,8 +53,11 @@ private:
const size_t channel_id;
const bool bind_by_id;
const size_t num_queues;
const String & exchange_type;
const String & local_exchange_name;
const String & local_exchange;
const String local_default_exchange;
const String local_hash_exchange;
Poco::Logger * log;
char row_delimiter;

View File

@ -107,6 +107,8 @@ StorageRabbitMQ::StorageRabbitMQ(
auto table_id = getStorageID();
String table_name = table_id.table_name;
/// Make sure that local exchange name is unique for each table and is not the same as client's exchange name
local_exchange_name = exchange_name + "_" + table_name;
}
@ -132,6 +134,7 @@ Pipes StorageRabbitMQ::read(
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return pipes;
}
@ -225,12 +228,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ProducerBufferPtr StorageRabbitMQ::createWriteBuffer()
{
/* If exchange type is set, then there are different exchanges for external publishing and for INSERT query
* as in this case they are of different types.
*/
String producer_exchange = exchange_type == "default" ? local_exchange_name : local_exchange_name + "_default";
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, login_password, routing_keys[0], producer_exchange,
return std::make_shared<WriteBufferToRabbitMQProducer>(parsed_address, login_password, routing_keys[0], local_exchange_name,
log, num_consumers * num_queues, bind_by_id, use_transactional_channel,
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
}

View File

@ -77,7 +77,7 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
WriteBufferToRabbitMQProducer::~WriteBufferToRabbitMQProducer()
{
finilize();
finilizeProducer();
connection.close();
assert(rows == 0 && chunks.empty());
}
@ -118,7 +118,9 @@ void WriteBufferToRabbitMQProducer::countRow()
++message_counter;
/// run event loop to actually publish, checking exchange is just a point to stop the event loop
/* Run event loop to actually publish, checking exchange is just a point to stop the event loop. Messages are not sent
* without looping and looping after every batch is much better than processing all the messages in one time.
*/
if ((message_counter %= Batch) == 0)
{
checkExchange();
@ -132,7 +134,7 @@ void WriteBufferToRabbitMQProducer::checkExchange()
std::atomic<bool> exchange_declared = false, exchange_error = false;
/* The AMQP::passive flag indicates that it should only be checked if there is a valid exchange with the given name
* and makes it visible from current producer_channel.
* and makes it declared on the current producer_channel.
*/
producer_channel->declareExchange(exchange_name, AMQP::direct, AMQP::passive)
.onSuccess([&]()
@ -142,7 +144,7 @@ void WriteBufferToRabbitMQProducer::checkExchange()
.onError([&](const char * message)
{
exchange_error = true;
LOG_ERROR(log, "Exchange was not declared: {}", message);
LOG_ERROR(log, "Exchange for INSERT query was not declared. Reason: {}", message);
});
/// These variables are updated in a separate thread and starting the loop blocks current thread
@ -153,7 +155,7 @@ void WriteBufferToRabbitMQProducer::checkExchange()
}
void WriteBufferToRabbitMQProducer::finilize()
void WriteBufferToRabbitMQProducer::finilizeProducer()
{
checkExchange();

View File

@ -40,7 +40,7 @@ public:
private:
void nextImpl() override;
void checkExchange();
void finilize();
void finilizeProducer();
std::pair<String, String> & login_password;
const String routing_key;
@ -56,9 +56,6 @@ private:
size_t next_queue = 0;
UInt64 message_counter = 0;
String channel_id;
Messages messages;
Poco::Logger * log;
const std::optional<char> delim;

View File

@ -497,7 +497,7 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
assert int(result) == rabbitmq_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(320)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
NUM_CHANNELS = 5
@ -560,7 +560,7 @@ def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(320)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
NUM_QUEUES = 4
@ -623,7 +623,7 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(320)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster):
NUM_CONSUMERS = 10
@ -688,7 +688,7 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(320)
@pytest.mark.timeout(420)
def test_rabbitmq_read_only_combo(rabbitmq_cluster):
NUM_MV = 5;
@ -768,7 +768,7 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster):
assert int(result) == messages_num * threads_num * NUM_MV, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_insert(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -1054,7 +1054,10 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
key = "direct_" + str(key_num)
key_num += 1
for message in messages:
channel.basic_publish(exchange='direct_exchange_testing', routing_key=key, body=message)
mes_id = str(randrange(10))
channel.basic_publish(
exchange='direct_exchange_testing', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1066,8 +1069,8 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster):
for consumer_id in range(num_tables):
instance.query('''
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
DROP TABLE IF EXISTS test.direct_exchange_{0};
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
'''.format(consumer_id))
instance.query('''
@ -1122,7 +1125,10 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
key_num = 0
for message in messages:
channel.basic_publish(exchange='fanout_exchange_testing', routing_key='', body=message)
mes_id = str(randrange(10))
channel.basic_publish(
exchange='fanout_exchange_testing', routing_key='',
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1215,7 +1221,10 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
key = "random.logs"
for message in messages:
channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, body=message)
mes_id = str(randrange(10))
channel.basic_publish(
exchange='topic_exchange_testing', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1225,18 +1234,12 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
if int(result) == messages_num * num_tables + messages_num * num_tables:
break
for consumer_id in range(num_tables):
for consumer_id in range(num_tables * 2):
instance.query('''
DROP TABLE IF EXISTS test.topic_exchange_{0};
DROP TABLE IF EXISTS test.topic_exchange_{0}_mv;
'''.format(consumer_id))
for consumer_id in range(num_tables):
instance.query('''
DROP TABLE IF EXISTS test.topic_exchange_{0};
DROP TABLE IF EXISTS test.topic_exchange_{0}_mv;
'''.format(num_tables + consumer_id))
instance.query('''
DROP TABLE IF EXISTS test.destination;
''')
@ -1244,7 +1247,7 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
assert int(result) == messages_num * num_tables + messages_num * num_tables, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(320)
@pytest.mark.timeout(420)
def test_rabbitmq_hash_exchange(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.destination;
@ -1288,8 +1291,8 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster):
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = str(randrange(10))
for message in messages:
key = str(randrange(10))
channel.basic_publish(exchange='hash_exchange_testing', routing_key=key, body=message)
connection.close()
@ -1389,7 +1392,9 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
for key in keys:
for message in messages:
channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key, body=message)
mes_id = str(randrange(10))
channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1488,8 +1493,9 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
key_num = 0
for message in messages:
mes_id = str(randrange(10))
channel.basic_publish(exchange='headers_exchange_testing', routing_key='',
properties=pika.BasicProperties(headers=fields), body=message)
properties=pika.BasicProperties(headers=fields, message_id=mes_id), body=message)
connection.close()
@ -1499,16 +1505,11 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
if int(result) == messages_num * num_tables_to_receive:
break
for consumer_id in range(num_tables_to_receive):
for consumer_id in range(num_tables_to_receive + num_tables_to_ignore):
instance.query('''
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
DROP TABLE IF EXISTS test.direct_exchange_{0};
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
'''.format(consumer_id))
for consumer_id in range(num_tables_to_ignore):
instance.query('''
DROP TABLE IF EXISTS test.direct_exchange_{0}_mv;
DROP TABLE IF EXISTS test.direct_exchange_{0};
'''.format(consumer_id + num_tables_to_receive))
instance.query('''
DROP TABLE IF EXISTS test.destination;