Make rabbitmq reject broken messages

This commit is contained in:
kssenii 2024-02-22 16:56:33 +01:00
parent fb7924cdb1
commit 4d57aaebb1
4 changed files with 125 additions and 3 deletions

View File

@ -106,6 +106,18 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
if (commit_info.channel_id != channel_id)
return false;
for (const auto & delivery_tag : commit_info.failed_delivery_tags)
{
if (consumer_channel->reject(delivery_tag))
LOG_TRACE(
log, "Consumer rejected message with deliveryTag {} on channel {}",
delivery_tag, channel_id);
else
LOG_WARNING(
log, "Failed to reject message with deliveryTag {} on channel {}",
delivery_tag, channel_id);
}
/// Duplicate ack?
if (commit_info.delivery_tag > last_commited_delivery_tag
&& consumer_channel->ack(commit_info.delivery_tag, AMQP::multiple))

View File

@ -39,6 +39,7 @@ public:
{
UInt64 delivery_tag = 0;
String channel_id;
std::vector<UInt64> failed_delivery_tags;
};
struct MessageData

View File

@ -168,7 +168,9 @@ Chunk RabbitMQSource::generateImpl()
StreamingFormatExecutor executor(non_virtual_header, input_format, on_error);
RabbitMQConsumer::CommitInfo current_commit_info;
/// Channel id will not change during read.
commit_info.channel_id = consumer->getChannelID();
while (true)
{
exception_message.reset();
@ -176,8 +178,12 @@ Chunk RabbitMQSource::generateImpl()
if (consumer->hasPendingMessages())
{
/// A buffer containing a single RabbitMQ message.
if (auto buf = consumer->consume())
{
new_rows = executor.execute(*buf);
chassert(new_rows == 1);
}
}
if (new_rows)
@ -185,6 +191,16 @@ Chunk RabbitMQSource::generateImpl()
const auto exchange_name = storage.getExchange();
const auto & message = consumer->currentMessage();
if (exception_message.has_value())
{
commit_info.failed_delivery_tags.push_back(message.delivery_tag);
}
else
{
chassert(commit_info.delivery_tag < message.delivery_tag);
commit_info.delivery_tag = message.delivery_tag;
}
for (size_t i = 0; i < new_rows; ++i)
{
virtual_columns[0]->insert(exchange_name);
@ -209,7 +225,6 @@ Chunk RabbitMQSource::generateImpl()
}
total_rows += new_rows;
current_commit_info = {message.delivery_tag, message.channel_id};
}
else if (total_rows == 0)
{
@ -251,7 +266,6 @@ Chunk RabbitMQSource::generateImpl()
for (auto & column : virtual_columns)
result_columns.push_back(std::move(column));
commit_info = current_commit_info;
return Chunk(std::move(result_columns), total_rows);
}

View File

@ -3648,3 +3648,98 @@ def test_rabbitmq_nack_failed_insert(rabbitmq_cluster):
"""
)
connection.close()
def test_rabbitmq_reject_broken_messages(rabbitmq_cluster):
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
deadletter_exchange = "deadletter_exchange_handle_error_mode_stream"
deadletter_queue = "deadletter_queue_handle_error_mode_stream"
channel.exchange_declare(exchange=deadletter_exchange)
result = channel.queue_declare(queue=deadletter_queue)
channel.queue_bind(
exchange=deadletter_exchange, routing_key="", queue=deadletter_queue
)
instance.query(
f"""
DROP TABLE IF EXISTS test.rabbitmq;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.data;
DROP TABLE IF EXISTS test.errors;
DROP TABLE IF EXISTS test.errors_view;
CREATE TABLE test.rabbit (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672',
rabbitmq_exchange_name = 'select',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n',
rabbitmq_handle_error_mode = 'stream',
rabbitmq_queue_settings_list='x-dead-letter-exchange={deadletter_exchange}';
CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String))
ENGINE = MergeTree()
ORDER BY tuple();
CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS
SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error);
CREATE TABLE test.data (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.view TO test.data AS
SELECT key, value FROM test.rabbit;
"""
)
messages = []
num_rows = 50
for i in range(num_rows):
if i % 2 == 0:
messages.append(json.dumps({"key": i, "value": i}))
else:
messages.append("Broken message " + str(i))
for message in messages:
channel.basic_publish(exchange="select", routing_key="", body=message)
time.sleep(1)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM test.data"))
if rows == num_rows:
break
attempt += 1
assert rows == num_rows
dead_letters = []
def on_dead_letter(channel, method, properties, body):
dead_letters.append(body)
if len(dead_letters) == num_rows / 2:
channel.stop_consuming()
channel.basic_consume(deadletter_queue, on_dead_letter)
channel.start_consuming()
assert len(dead_letters) == num_rows / 2
i = 1
for letter in dead_letters:
assert f"Broken message {i}" in str(letter)
i += 2
connection.close()