Add dl-exchange, commits

This commit is contained in:
kssenii 2020-07-24 12:33:07 +00:00
parent f9a4bf9e61
commit 2b57857afc
10 changed files with 296 additions and 163 deletions

View File

@ -137,6 +137,8 @@ Block RabbitMQBlockInputStream::readImpl()
virtual_columns[3]->insert(redelivered);
}
last_inserted_delivery_tag = delivery_tag;
total_rows = total_rows + new_rows;
buffer->allowNext();
@ -158,4 +160,13 @@ Block RabbitMQBlockInputStream::readImpl()
return result_block;
}
void RabbitMQBlockInputStream::readSuffixImpl()
{
if (!buffer)
return;
buffer->ackMessages(last_inserted_delivery_tag);
}
}

View File

@ -26,6 +26,7 @@ public:
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
private:
StorageRabbitMQ & storage;
@ -38,6 +39,7 @@ private:
const Block virtual_header;
ConsumerBufferPtr buffer;
UInt64 last_inserted_delivery_tag;
};
}

View File

@ -31,11 +31,9 @@ void RabbitMQHandler::onError(AMQP::TcpConnection * connection, const char * mes
void RabbitMQHandler::startLoop()
{
std::lock_guard lock(startup_mutex);
loop_started.store(true);
/// stop_loop variable is updated in a separate thread
while (!stop_loop.load())
uv_run(loop, UV_RUN_NOWAIT);
loop_started.store(false);
}
void RabbitMQHandler::iterateLoop()

View File

@ -21,13 +21,12 @@ public:
void stop() { stop_loop.store(true); }
void startLoop();
void iterateLoop();
bool checkLoop() const { return loop_started.load(); }
private:
uv_loop_t * loop;
Poco::Logger * log;
std::atomic<bool> stop_loop = false, loop_started = false;
std::atomic<bool> stop_loop = false;
std::mutex startup_mutex;
};

View File

@ -20,6 +20,7 @@ namespace DB
M(SettingUInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \
M(SettingBool, rabbitmq_transactional_channel, false, "Use transactional channel for publishing.", 0) \
M(SettingString, rabbitmq_queue_base, "", "Base for queue names to be able to reopen non-empty queues in case of failure.", 0) \
M(SettingString, rabbitmq_deadletter_exchange, "", "Exchange name to be passed as a dead-letter-exchange name.", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_RABBITMQ_SETTINGS)

View File

@ -30,6 +30,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
bool hash_exchange_,
size_t num_queues_,
const String & local_exchange_,
const String & deadletter_exchange_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
@ -46,6 +47,7 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
, row_delimiter(row_delimiter_)
, stopped(stopped_)
, local_exchange(local_exchange_)
, deadletter_exchange(deadletter_exchange_)
, received(QUEUE_SIZE * num_queues)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
@ -55,6 +57,12 @@ ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
{
if (ack.load() && consumer_channel)
{
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
LOG_TRACE(log, "Acknowledged messages with deliveryTags up to {}", prev_tag);
}
consumer_channel->close();
received.clear();
BufferBase::set(nullptr, 0, 0);
@ -162,14 +170,20 @@ void ReadBufferFromRabbitMQConsumer::initQueueBindings(const size_t queue_id)
LOG_ERROR(log, "Failed to declare queue on the channel. Reason: {}", message);
});
AMQP::Table queue_settings;
if (!deadletter_exchange.empty())
{
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
}
if (!queue_base.empty())
{
const String queue_name = !hash_exchange ? queue_base : queue_base + "_" + std::to_string(channel_id) + "_" + std::to_string(queue_id);
setup_channel->declareQueue(queue_name, AMQP::durable).onSuccess(success_callback).onError(error_callback);
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
}
else
{
setup_channel->declareQueue(AMQP::durable).onSuccess(success_callback).onError(error_callback);
setup_channel->declareQueue(AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
}
while (!bindings_created && !bindings_error)
@ -184,15 +198,20 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
if (subscribed_queue[queue_name])
return;
consumer_channel->consume(queue_name, AMQP::noack)
consumer_channel->consume(queue_name)
.onSuccess([&](const std::string & consumer)
{
subscribed_queue[queue_name] = true;
consumer_error = false;
++count_subscribed;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
consumer_error = false;
consumer_tag = consumer;
LOG_TRACE(log, "Consumer {} is subscribed to queue {}", channel_id, queue_name);
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Consumer {} error: {}", consumer_tag, message);
});
})
.onReceived([&](const AMQP::Message & message, uint64_t deliveryTag, bool redelivered)
{
@ -201,11 +220,16 @@ void ReadBufferFromRabbitMQConsumer::subscribe(const String & queue_name)
{
String message_received = std::string(message.body(), message.body() + message_size);
if (row_delimiter != '\0')
{
message_received += row_delimiter;
}
received.push({deliveryTag, message_received, redelivered});
std::lock_guard lock(wait_ack);
if (ack.exchange(false) && prev_tag < max_tag && consumer_channel)
{
consumer_channel->ack(prev_tag, AMQP::multiple); /// Will ack all up to last tag staring from last acked.
LOG_TRACE(log, "Consumer {} acknowledged messages with deliveryTags up to {}", consumer_tag, prev_tag);
}
}
})
.onError([&](const char * message)
@ -243,6 +267,17 @@ void ReadBufferFromRabbitMQConsumer::checkSubscription()
}
void ReadBufferFromRabbitMQConsumer::ackMessages(UInt64 last_inserted_delivery_tag)
{
if (last_inserted_delivery_tag > prev_tag)
{
std::lock_guard lock(wait_ack);
prev_tag = last_inserted_delivery_tag;
ack.store(true);
}
}
void ReadBufferFromRabbitMQConsumer::iterateEventLoop()
{
event_handler->iterateLoop();
@ -259,6 +294,7 @@ bool ReadBufferFromRabbitMQConsumer::nextImpl()
auto * new_position = const_cast<char *>(current.message.data());
BufferBase::set(new_position, current.message.size(), 0);
allowed = false;
max_tag = current.delivery_tag;
return true;
}

View File

@ -36,6 +36,7 @@ public:
bool hash_exchange_,
size_t num_queues_,
const String & local_exchange_,
const String & deadletter_exchange_,
const std::atomic<bool> & stopped_);
~ReadBufferFromRabbitMQConsumer() override;
@ -49,6 +50,7 @@ public:
void allowNext() { allowed = true; } // Allow to read next message.
void checkSubscription();
void ackMessages(UInt64 last_inserted_delivery_tag);
auto getConsumerTag() const { return consumer_tag; }
auto getDeliveryTag() const { return current.delivery_tag; }
@ -72,15 +74,19 @@ private:
bool allowed = true;
const std::atomic<bool> & stopped;
const String local_exchange;
const String local_exchange, deadletter_exchange;
std::atomic<bool> consumer_error = false;
std::atomic<size_t> count_subscribed = 0, wait_subscribed;
String consumer_tag;
ConcurrentBoundedQueue<MessageData> received;
UInt64 prev_tag = 0;
MessageData current;
std::vector<String> queues;
std::unordered_map<String, bool> subscribed_queue;
std::atomic<bool> ack = false;
std::mutex wait_ack;
UInt64 max_tag = 0;
bool nextImpl() override;

View File

@ -73,7 +73,8 @@ StorageRabbitMQ::StorageRabbitMQ(
size_t num_consumers_,
size_t num_queues_,
const bool use_transactional_channel_,
const String & queue_base_)
const String & queue_base_,
const String & deadletter_exchange_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
@ -85,6 +86,7 @@ StorageRabbitMQ::StorageRabbitMQ(
, num_queues(num_queues_)
, use_transactional_channel(use_transactional_channel_)
, queue_base(queue_base_)
, deadletter_exchange(deadletter_exchange_)
, log(&Poco::Logger::get("StorageRabbitMQ (" + table_id_.table_name + ")"))
, parsed_address(parseAddress(global_context.getMacros()->expand(host_port_), 5672))
, login_password(std::make_pair(
@ -224,6 +226,7 @@ void StorageRabbitMQ::initExchange()
void StorageRabbitMQ::bindExchange()
{
std::atomic<bool> binding_created = false;
size_t bound_keys = 0;
/// Bridge exchange connects client's exchange with consumers' queues.
if (exchange_type == AMQP::ExchangeType::headers)
@ -257,6 +260,8 @@ void StorageRabbitMQ::bindExchange()
setup_channel->bindExchange(exchange_name, bridge_exchange, routing_key)
.onSuccess([&]()
{
++bound_keys;
if (bound_keys == routing_keys.size())
binding_created = true;
})
.onError([&](const char * message)
@ -434,7 +439,7 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, setup_channel, event_handler, consumer_exchange, exchange_type, routing_keys,
next_channel_id, queue_base, log, row_delimiter, hash_exchange, num_queues,
local_exchange, stream_cancelled);
local_exchange, deadletter_exchange, stream_cancelled);
}
@ -739,10 +744,22 @@ void registerStorageRabbitMQ(StorageFactory & factory)
}
}
String deadletter_exchange = rabbitmq_settings.rabbitmq_deadletter_exchange.value;
if (args_count >= 11)
{
engine_args[10] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[10], args.local_context);
const auto * ast = engine_args[9]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
{
deadletter_exchange = safeGet<String>(ast->value);
}
}
return StorageRabbitMQ::create(
args.table_id, args.context, args.columns,
host_port, routing_keys, exchange, format, row_delimiter, exchange_type, num_consumers,
num_queues, use_transactional_channel, queue_base);
num_queues, use_transactional_channel, queue_base, deadletter_exchange);
};
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });

View File

@ -72,7 +72,8 @@ protected:
size_t num_consumers_,
size_t num_queues_,
const bool use_transactional_channel_,
const String & queue_base_);
const String & queue_base_,
const String & deadletter_exchange);
private:
Context global_context;
@ -90,6 +91,7 @@ private:
size_t num_queues;
const bool use_transactional_channel;
const String queue_base;
const String deadletter_exchange;
Poco::Logger * log;
std::pair<String, UInt16> parsed_address;

View File

@ -496,135 +496,9 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
assert int(result) == rabbitmq_messages*batch_messages, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_channels_publish(rabbitmq_cluster):
NUM_CHANNELS = 5
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'test_channels_sharding',
rabbitmq_num_consumers = 5,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
''')
time.sleep(1)
i = [0]
messages_num = 10000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = str(randrange(1, NUM_CHANNELS))
for message in messages:
channel.basic_publish(exchange='test_channels_sharding', routing_key=key, body=message)
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num * threads_num:
break
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
NUM_QUEUES = 4
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'test_queues_sharding',
rabbitmq_num_queues = 4,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
''')
time.sleep(1)
i = [0]
messages_num = 10000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = str(randrange(1, NUM_QUEUES))
for message in messages:
channel.basic_publish(exchange='test_queues_sharding', routing_key=key, body=message)
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num * threads_num:
break
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster):
NUM_CONSUMERS = 10
NUM_QUEUES = 2
@ -639,12 +513,12 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
CREATE TABLE test.view (key UInt64, value UInt64, consumer_tag String)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime=5, cleanup_delay_period=2, cleanup_delay_period_random_add=3;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq;
''')
time.sleep(1)
@ -662,9 +536,12 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = str(randrange(1, NUM_QUEUES * NUM_CONSUMERS))
current = 0
for message in messages:
channel.basic_publish(exchange='test_sharding', routing_key=key, body=message)
current += 1
mes_id = str(current)
channel.basic_publish(exchange='test_sharding', routing_key='',
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
threads = []
@ -676,16 +553,20 @@ def test_rabbitmq_sharding_between_channels_and_queues_publish(rabbitmq_cluster)
time.sleep(random.uniform(0, 1))
thread.start()
result1 = ''
while True:
result = instance.query('SELECT count() FROM test.view')
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num * threads_num:
if int(result1) == messages_num * threads_num:
break
result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view")
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
assert int(result2) == 10
@pytest.mark.timeout(420)
@ -734,8 +615,12 @@ def test_rabbitmq_read_only_combo(rabbitmq_cluster):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key = str(randrange(1, NUM_CONSUMERS))
current = 0
for message in messages:
channel.basic_publish(exchange='combo', routing_key=key, body=message)
current += 1
mes_id = str(current)
channel.basic_publish(exchange='combo', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
threads = []
@ -1140,11 +1025,11 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
key_num = 0
current = 0
for message in messages:
mes_id = str(randrange(10))
channel.basic_publish(
exchange='fanout_exchange_testing', routing_key='',
current += 1
mes_id = str(current)
channel.basic_publish(exchange='fanout_exchange_testing', routing_key='',
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1236,10 +1121,11 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster):
channel.basic_publish(exchange='topic_exchange_testing', routing_key=key, body=message)
key = "random.logs"
current = 0
for message in messages:
mes_id = str(randrange(10))
channel.basic_publish(
exchange='topic_exchange_testing', routing_key=key,
current += 1
mes_id = str(current)
channel.basic_publish(exchange='topic_exchange_testing', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
connection.close()
@ -1411,8 +1297,10 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster):
keys = ['key1', 'key2', 'key3', 'key4', 'key5']
for key in keys:
current = 0
for message in messages:
mes_id = str(randrange(10))
current += 1
mes_id = str(current)
channel.basic_publish(exchange='multiple_bindings_testing', routing_key=key,
properties=pika.BasicProperties(message_id=mes_id), body=message)
@ -1510,9 +1398,10 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster):
fields['type']='report'
fields['year']='2020'
key_num = 0
current = 0
for message in messages:
mes_id = str(randrange(10))
current += 1
mes_id = str(current)
channel.basic_publish(exchange='headers_exchange_testing', routing_key='',
properties=pika.BasicProperties(headers=fields, message_id=mes_id), body=message)
@ -1674,7 +1563,91 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_queue_resume(rabbitmq_cluster):
def test_rabbitmq_queue_resume_1(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'queue_resume',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'queue_resume',
rabbitmq_num_consumers = '2',
rabbitmq_num_queues = '2',
rabbitmq_queue_base = 'queue_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
i = [0]
messages_num = 5000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
for message in messages:
channel.basic_publish(exchange='queue_resume', routing_key='queue_resume', body=message,
properties=pika.BasicProperties(delivery_mode = 2))
connection.close()
threads = []
threads_num = 10
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
for thread in threads:
thread.join()
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_queue_resume;
''')
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'queue_resume',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'queue_resume',
rabbitmq_num_consumers = '2',
rabbitmq_num_queues = '2',
rabbitmq_queue_base = 'queue_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq_queue_resume;
''')
while True:
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result1) == messages_num * threads_num:
break
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_queue_resume;
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.view;
''')
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
@pytest.mark.timeout(420)
def test_rabbitmq_queue_resume_2(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_queue_resume (key UInt64, value UInt64)
ENGINE = RabbitMQ
@ -1699,7 +1672,6 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster):
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
@ -1762,6 +1734,95 @@ def test_rabbitmq_queue_resume(rabbitmq_cluster):
assert int(result2) == 2
@pytest.mark.timeout(420)
def test_rabbitmq_consumer_acknowledgements(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_acks',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'consumer_acks',
rabbitmq_queue_base = 'consumer_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
i = [0]
messages_num = 5000
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
def produce():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
messages = []
for _ in range(messages_num):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
for message in messages:
channel.basic_publish(exchange='consumer_acks', routing_key='consumer_acks', body=message,
properties=pika.BasicProperties(delivery_mode = 2))
connection.close()
threads = []
threads_num = 20
for _ in range(threads_num):
threads.append(threading.Thread(target=produce))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
for thread in threads:
thread.join()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64, consumer_tag String)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT *, _consumer_tag AS consumer_tag FROM test.rabbitmq_consumer_acks;
''')
while int(instance.query('SELECT count() FROM test.view')) == 0:
time.sleep(1)
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_consumer_acks;
''')
collected = int(instance.query('SELECT count() FROM test.view'))
instance.query('''
CREATE TABLE test.rabbitmq_consumer_acks (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_queue_base = 'consumer_resume',
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
while True:
result1 = instance.query('SELECT count() FROM test.view')
time.sleep(1)
#print("receiived", result1, "collected", collected)
if int(result1) == messages_num * threads_num:
break
result2 = instance.query("SELECT count(DISTINCT consumer_tag) FROM test.view")
instance.query('''
DROP TABLE IF EXISTS test.rabbitmq_consumer_acks;
DROP TABLE IF EXISTS test.consumer;
DROP TABLE IF EXISTS test.view;
''')
assert int(result1) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
if collected < result1:
assert int(result2) == 2
if __name__ == '__main__':
cluster.start()