Merge pull request #63465 from ClickHouse/backport/24.3/60312

Backport #60312 to 24.3: Make rabbitmq nack broken messages
This commit is contained in:
robot-clickhouse-ci-1 2024-05-07 15:27:28 +02:00 committed by GitHub
commit 913b06b351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 237 additions and 52 deletions

View File

@ -73,6 +73,7 @@ Optional parameters:
- `rabbitmq_queue_consume` - Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings. Default: `false`. - `rabbitmq_queue_consume` - Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings. Default: `false`.
- `rabbitmq_username` - RabbitMQ username. - `rabbitmq_username` - RabbitMQ username.
- `rabbitmq_password` - RabbitMQ password. - `rabbitmq_password` - RabbitMQ password.
- `reject_unhandled_messages` - Reject messages (send RabbitMQ negative acknowledgement) in case of errors. This setting is automatically enabled if there is a `x-dead-letter-exchange` defined in `rabbitmq_queue_settings_list`.
- `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`. - `rabbitmq_commit_on_select` - Commit messages when select query is made. Default: `false`.
- `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`. - `rabbitmq_max_rows_per_message` — The maximum number of rows written in one RabbitMQ message for row-based formats. Default : `1`.
- `rabbitmq_empty_queue_backoff_start` — A start backoff point to reschedule read if the rabbitmq queue is empty. - `rabbitmq_empty_queue_backoff_start` — A start backoff point to reschedule read if the rabbitmq queue is empty.

View File

@ -94,17 +94,31 @@ void RabbitMQConsumer::subscribe()
bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info) bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
{ {
if (state != State::OK) if (state != State::OK)
{
LOG_TEST(log, "State is {}, will not ack messages", magic_enum::enum_name(state.load(std::memory_order_relaxed)));
return false; return false;
}
/// Nothing to ack.
if (!commit_info.delivery_tag)
return false;
/// Do not send ack to server if message's channel is not the same as /// Do not send ack to server if message's channel is not the same as
/// current running channel because delivery tags are scoped per channel, /// current running channel because delivery tags are scoped per channel,
/// so if channel fails, all previous delivery tags become invalid. /// so if channel fails, all previous delivery tags become invalid.
if (commit_info.channel_id != channel_id) if (commit_info.channel_id != channel_id)
{
LOG_TEST(log, "Channel ID changed {} -> {}, will not ack messages", commit_info.channel_id, channel_id);
return false; return false;
}
for (const auto & delivery_tag : commit_info.failed_delivery_tags)
{
if (consumer_channel->reject(delivery_tag))
LOG_TRACE(
log, "Consumer rejected message with deliveryTag {} on channel {}",
delivery_tag, channel_id);
else
LOG_WARNING(
log, "Failed to reject message with deliveryTag {} on channel {}",
delivery_tag, channel_id);
}
/// Duplicate ack? /// Duplicate ack?
if (commit_info.delivery_tag > last_commited_delivery_tag if (commit_info.delivery_tag > last_commited_delivery_tag
@ -119,11 +133,14 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
return true; return true;
} }
LOG_ERROR( if (commit_info.delivery_tag)
log, {
"Did not commit messages for {}:{}, (current commit point {}:{})", LOG_ERROR(
commit_info.channel_id, commit_info.delivery_tag, log,
channel_id, last_commited_delivery_tag); "Did not commit messages for {}:{}, (current commit point {}:{})",
commit_info.channel_id, commit_info.delivery_tag,
channel_id, last_commited_delivery_tag);
}
return false; return false;
} }
@ -131,11 +148,18 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info) bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info)
{ {
if (state != State::OK) if (state != State::OK)
{
LOG_TEST(log, "State is {}, will not nack messages", magic_enum::enum_name(state.load(std::memory_order_relaxed)));
return false; return false;
}
/// Nothing to nack. /// Nothing to nack.
if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag) if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag)
{
LOG_TEST(log, "Delivery tag is {}, last committed delivery tag: {}, Will not nack messages",
commit_info.delivery_tag, last_commited_delivery_tag);
return false; return false;
}
if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple)) if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple))
{ {
@ -187,8 +211,14 @@ void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
consumer_channel->onError([&](const char * message) consumer_channel->onError([&](const char * message)
{ {
LOG_ERROR(log, "Channel {} in in error state: {}", channel_id, message); LOG_ERROR(
state = State::ERROR; log, "Channel {} received an error: {} (usable: {}, connected: {})",
channel_id, message, consumer_channel->usable(), consumer_channel->connected());
if (!consumer_channel->usable() || !consumer_channel->connected())
{
state = State::ERROR;
}
}); });
} }

View File

@ -39,6 +39,7 @@ public:
{ {
UInt64 delivery_tag = 0; UInt64 delivery_tag = 0;
String channel_id; String channel_id;
std::vector<UInt64> failed_delivery_tags;
}; };
struct MessageData struct MessageData
@ -110,7 +111,7 @@ private:
ConcurrentBoundedQueue<MessageData> received; ConcurrentBoundedQueue<MessageData> received;
MessageData current; MessageData current;
UInt64 last_commited_delivery_tag; UInt64 last_commited_delivery_tag = 0;
std::condition_variable cv; std::condition_variable cv;
std::mutex mutex; std::mutex mutex;

View File

@ -32,6 +32,7 @@ namespace DB
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \ M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \
M(String, rabbitmq_username, "", "RabbitMQ username", 0) \ M(String, rabbitmq_username, "", "RabbitMQ username", 0) \
M(String, rabbitmq_password, "", "RabbitMQ password", 0) \ M(String, rabbitmq_password, "", "RabbitMQ password", 0) \
M(Bool, reject_unhandled_messages, false, "Allow messages to be rejected in case they cannot be processed. This also automatically implies if there is a x-deadletter-exchange queue setting added", 0) \
M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \ M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, rabbitmq_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ M(UInt64, rabbitmq_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \
M(StreamingHandleErrorMode, rabbitmq_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for RabbitMQ engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ M(StreamingHandleErrorMode, rabbitmq_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for RabbitMQ engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \

View File

@ -46,7 +46,9 @@ RabbitMQSource::RabbitMQSource(
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_, StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix_) bool nack_broken_messages_,
bool ack_in_suffix_,
LoggerPtr log_)
: RabbitMQSource( : RabbitMQSource(
storage_, storage_,
storage_snapshot_, storage_snapshot_,
@ -56,7 +58,9 @@ RabbitMQSource::RabbitMQSource(
max_block_size_, max_block_size_,
max_execution_time_, max_execution_time_,
handle_error_mode_, handle_error_mode_,
ack_in_suffix_) nack_broken_messages_,
ack_in_suffix_,
log_)
{ {
} }
@ -69,7 +73,9 @@ RabbitMQSource::RabbitMQSource(
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_, StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix_) bool nack_broken_messages_,
bool ack_in_suffix_,
LoggerPtr log_)
: ISource(getSampleBlock(headers.first, headers.second)) : ISource(getSampleBlock(headers.first, headers.second))
, storage(storage_) , storage(storage_)
, storage_snapshot(storage_snapshot_) , storage_snapshot(storage_snapshot_)
@ -78,9 +84,10 @@ RabbitMQSource::RabbitMQSource(
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, handle_error_mode(handle_error_mode_) , handle_error_mode(handle_error_mode_)
, ack_in_suffix(ack_in_suffix_) , ack_in_suffix(ack_in_suffix_)
, nack_broken_messages(nack_broken_messages_)
, non_virtual_header(std::move(headers.first)) , non_virtual_header(std::move(headers.first))
, virtual_header(std::move(headers.second)) , virtual_header(std::move(headers.second))
, log(getLogger("RabbitMQSource")) , log(log_)
, max_execution_time_ms(max_execution_time_) , max_execution_time_ms(max_execution_time_)
{ {
storage.incrementReader(); storage.incrementReader();
@ -119,7 +126,10 @@ Chunk RabbitMQSource::generate()
{ {
auto chunk = generateImpl(); auto chunk = generateImpl();
if (!chunk && ack_in_suffix) if (!chunk && ack_in_suffix)
{
LOG_TEST(log, "Will send ack on select");
sendAck(); sendAck();
}
return chunk; return chunk;
} }
@ -178,7 +188,7 @@ Chunk RabbitMQSource::generateImpl()
StreamingFormatExecutor executor(non_virtual_header, input_format, on_error); StreamingFormatExecutor executor(non_virtual_header, input_format, on_error);
RabbitMQConsumer::CommitInfo current_commit_info; /// Channel id will not change during read.
while (true) while (true)
{ {
exception_message.reset(); exception_message.reset();
@ -186,8 +196,11 @@ Chunk RabbitMQSource::generateImpl()
if (consumer->hasPendingMessages()) if (consumer->hasPendingMessages())
{ {
/// A buffer containing a single RabbitMQ message.
if (auto buf = consumer->consume()) if (auto buf = consumer->consume())
{
new_rows = executor.execute(*buf); new_rows = executor.execute(*buf);
}
} }
if (new_rows) if (new_rows)
@ -195,6 +208,24 @@ Chunk RabbitMQSource::generateImpl()
const auto exchange_name = storage.getExchange(); const auto exchange_name = storage.getExchange();
const auto & message = consumer->currentMessage(); const auto & message = consumer->currentMessage();
LOG_TEST(log, "Pulled {} rows, message delivery tag: {}, "
"previous delivery tag: {}, redelivered: {}, failed delivery tags by this moment: {}, exception message: {}",
new_rows, message.delivery_tag, commit_info.delivery_tag, message.redelivered,
commit_info.failed_delivery_tags.size(),
exception_message.has_value() ? exception_message.value() : "None");
commit_info.channel_id = message.channel_id;
if (exception_message.has_value() && nack_broken_messages)
{
commit_info.failed_delivery_tags.push_back(message.delivery_tag);
}
else
{
chassert(!commit_info.delivery_tag || message.redelivered || commit_info.delivery_tag < message.delivery_tag);
commit_info.delivery_tag = std::max(commit_info.delivery_tag, message.delivery_tag);
}
for (size_t i = 0; i < new_rows; ++i) for (size_t i = 0; i < new_rows; ++i)
{ {
virtual_columns[0]->insert(exchange_name); virtual_columns[0]->insert(exchange_name);
@ -219,7 +250,6 @@ Chunk RabbitMQSource::generateImpl()
} }
total_rows += new_rows; total_rows += new_rows;
current_commit_info = {message.delivery_tag, message.channel_id};
} }
else if (total_rows == 0) else if (total_rows == 0)
{ {
@ -261,7 +291,6 @@ Chunk RabbitMQSource::generateImpl()
for (auto & column : virtual_columns) for (auto & column : virtual_columns)
result_columns.push_back(std::move(column)); result_columns.push_back(std::move(column));
commit_info = current_commit_info;
return Chunk(std::move(result_columns), total_rows); return Chunk(std::move(result_columns), total_rows);
} }

View File

@ -20,12 +20,15 @@ public:
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_, StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix = false); bool nack_broken_messages_,
bool ack_in_suffix,
LoggerPtr log_);
~RabbitMQSource() override; ~RabbitMQSource() override;
String getName() const override { return storage.getName(); } String getName() const override { return storage.getName(); }
void updateChannel(RabbitMQConnection & connection) { consumer->updateChannel(connection); } void updateChannel(RabbitMQConnection & connection) { consumer->updateChannel(connection); }
String getChannelID() const { return consumer->getChannelID(); }
Chunk generate() override; Chunk generate() override;
@ -39,10 +42,11 @@ private:
StorageRabbitMQ & storage; StorageRabbitMQ & storage;
StorageSnapshotPtr storage_snapshot; StorageSnapshotPtr storage_snapshot;
ContextPtr context; ContextPtr context;
Names column_names; const Names column_names;
const size_t max_block_size; const size_t max_block_size;
StreamingHandleErrorMode handle_error_mode; const StreamingHandleErrorMode handle_error_mode;
bool ack_in_suffix; const bool ack_in_suffix;
const bool nack_broken_messages;
bool is_finished = false; bool is_finished = false;
const Block non_virtual_header; const Block non_virtual_header;
@ -65,7 +69,9 @@ private:
size_t max_block_size_, size_t max_block_size_,
UInt64 max_execution_time_, UInt64 max_execution_time_,
StreamingHandleErrorMode handle_error_mode_, StreamingHandleErrorMode handle_error_mode_,
bool ack_in_suffix); bool nack_broken_messages_,
bool ack_in_suffix,
LoggerPtr log_);
Chunk generateImpl(); Chunk generateImpl();
}; };

View File

@ -64,6 +64,7 @@ namespace ExchangeType
static const String HEADERS = "headers"; static const String HEADERS = "headers";
} }
static const auto deadletter_exchange_setting = "x-dead-letter-exchange";
StorageRabbitMQ::StorageRabbitMQ( StorageRabbitMQ::StorageRabbitMQ(
const StorageID & table_id_, const StorageID & table_id_,
@ -84,15 +85,20 @@ StorageRabbitMQ::StorageRabbitMQ(
, queue_base(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base)) , queue_base(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_base))
, queue_settings_list(parseSettings(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_settings_list))) , queue_settings_list(parseSettings(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_queue_settings_list)))
, max_rows_per_message(rabbitmq_settings->rabbitmq_max_rows_per_message) , max_rows_per_message(rabbitmq_settings->rabbitmq_max_rows_per_message)
, log(getLogger("StorageRabbitMQ (" + table_id_.table_name + ")"))
, persistent(rabbitmq_settings->rabbitmq_persistent.value) , persistent(rabbitmq_settings->rabbitmq_persistent.value)
, use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value) , use_user_setup(rabbitmq_settings->rabbitmq_queue_consume.value)
, hash_exchange(num_consumers > 1 || num_queues > 1) , hash_exchange(num_consumers > 1 || num_queues > 1)
, log(getLogger("StorageRabbitMQ (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers)) , semaphore(0, static_cast<int>(num_consumers))
, unique_strbase(getRandomName()) , unique_strbase(getRandomName())
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize()))) , queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
, milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms) , milliseconds_to_wait(rabbitmq_settings->rabbitmq_empty_queue_backoff_start_ms)
{ {
reject_unhandled_messages = rabbitmq_settings->reject_unhandled_messages
|| queue_settings_list.end() !=
std::find_if(queue_settings_list.begin(), queue_settings_list.end(),
[](const String & name) { return name.starts_with(deadletter_exchange_setting); });
const auto & config = getContext()->getConfigRef(); const auto & config = getContext()->getConfigRef();
std::pair<String, UInt16> parsed_address; std::pair<String, UInt16> parsed_address;
@ -739,8 +745,9 @@ void StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto rabbit_source = std::make_shared<RabbitMQSource>( auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, modified_context, column_names, 1, *this, storage_snapshot, modified_context, column_names, /* max_block_size */1,
max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, rabbitmq_settings->rabbitmq_commit_on_select); max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, reject_unhandled_messages,
/* ack_in_suffix */rabbitmq_settings->rabbitmq_commit_on_select, log);
auto converting_dag = ActionsDAG::makeConvertingActions( auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(), rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
@ -975,7 +982,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); LOG_ERROR(log, "Error while streaming to views: {}", getCurrentExceptionMessage(true));
} }
mv_attached.store(false); mv_attached.store(false);
@ -1076,7 +1083,8 @@ bool StorageRabbitMQ::tryStreamToViews()
{ {
auto source = std::make_shared<RabbitMQSource>( auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, Names{}, block_size, *this, storage_snapshot, rabbitmq_context, Names{}, block_size,
max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode); max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode,
reject_unhandled_messages, /* ack_in_suffix */false, log);
sources.emplace_back(source); sources.emplace_back(source);
pipes.emplace_back(source); pipes.emplace_back(source);
@ -1129,7 +1137,10 @@ bool StorageRabbitMQ::tryStreamToViews()
if (!connection->isConnected()) if (!connection->isConnected())
{ {
if (shutdown_called) if (shutdown_called)
{
LOG_DEBUG(log, "Shutdown called, quitting");
return false; return false;
}
if (connection->reconnect()) if (connection->reconnect())
{ {
@ -1145,6 +1156,8 @@ bool StorageRabbitMQ::tryStreamToViews()
} }
else else
{ {
LOG_TEST(log, "Will {} messages for {} channels", write_failed ? "nack" : "ack", sources.size());
/// Commit /// Commit
for (auto & source : sources) for (auto & source : sources)
{ {
@ -1152,36 +1165,41 @@ bool StorageRabbitMQ::tryStreamToViews()
++queue_empty; ++queue_empty;
if (source->needChannelUpdate()) if (source->needChannelUpdate())
source->updateChannel(*connection);
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
* consumers. So in this case duplicates are inevitable.
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (write_failed ? source->sendNack() : source->sendAck())
{ {
/// Iterate loop to activate error callbacks if they happened LOG_TEST(log, "Channel {} is in error state, will update", source->getChannelID());
connection->getHandler().iterateLoop(); source->updateChannel(*connection);
if (!connection->isConnected())
break;
} }
else
{
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
* no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue
* waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other
* consumers. So in this case duplicates are inevitable.
* 2) size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more
* common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen.
* Also in this case if channel didn't get closed - it is ok if failed to send ack, because the next attempt to send ack on
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (write_failed ? source->sendNack() : source->sendAck())
{
/// Iterate loop to activate error callbacks if they happened
connection->getHandler().iterateLoop();
if (!connection->isConnected())
break;
}
connection->getHandler().iterateLoop(); connection->getHandler().iterateLoop();
}
} }
} }
if (write_failed) if (write_failed)
{ {
LOG_TRACE(log, "Write failed, reschedule"); LOG_TRACE(log, "Write failed, reschedule");
return false; return true;
} }
if (!hasDependencies(getStorageID())) if (!hasDependencies(getStorageID()))
@ -1200,10 +1218,11 @@ bool StorageRabbitMQ::tryStreamToViews()
} }
else else
{ {
LOG_TEST(log, "Will start background loop to let messages be pushed to channel");
startLoop(); startLoop();
} }
/// Do not reschedule, do not stop event loop. /// Reschedule.
return true; return true;
} }

View File

@ -91,6 +91,9 @@ private:
String queue_base; String queue_base;
Names queue_settings_list; Names queue_settings_list;
size_t max_rows_per_message; size_t max_rows_per_message;
bool reject_unhandled_messages = false;
LoggerPtr log;
/// For insert query. Mark messages as durable. /// For insert query. Mark messages as durable.
const bool persistent; const bool persistent;
@ -101,7 +104,6 @@ private:
bool use_user_setup; bool use_user_setup;
bool hash_exchange; bool hash_exchange;
LoggerPtr log;
RabbitMQConnectionPtr connection; /// Connection for all consumers RabbitMQConnectionPtr connection; /// Connection for all consumers
RabbitMQConfiguration configuration; RabbitMQConfiguration configuration;

View File

@ -3680,3 +3680,99 @@ def test_rabbitmq_nack_failed_insert(rabbitmq_cluster):
""" """
) )
connection.close() connection.close()
def test_rabbitmq_reject_broken_messages(rabbitmq_cluster):
credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
deadletter_exchange = "deadletter_exchange_handle_error_mode_stream"
deadletter_queue = "deadletter_queue_handle_error_mode_stream"
channel.exchange_declare(exchange=deadletter_exchange)
result = channel.queue_declare(queue=deadletter_queue)
channel.queue_bind(
exchange=deadletter_exchange, routing_key="", queue=deadletter_queue
)
instance.query(
f"""
DROP TABLE IF EXISTS test.rabbitmq;
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.data;
DROP TABLE IF EXISTS test.errors;
DROP TABLE IF EXISTS test.errors_view;
CREATE TABLE test.rabbit (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_cluster.rabbitmq_host}:5672',
rabbitmq_exchange_name = 'select',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n',
rabbitmq_handle_error_mode = 'stream',
rabbitmq_queue_settings_list='x-dead-letter-exchange={deadletter_exchange}';
CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String))
ENGINE = MergeTree()
ORDER BY tuple();
CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS
SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error);
CREATE TABLE test.data (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.view TO test.data AS
SELECT key, value FROM test.rabbit;
"""
)
messages = []
num_rows = 50
for i in range(num_rows):
if i % 2 == 0:
messages.append(json.dumps({"key": i, "value": i}))
else:
messages.append("Broken message " + str(i))
for message in messages:
channel.basic_publish(exchange="select", routing_key="", body=message)
time.sleep(1)
attempt = 0
rows = 0
while attempt < 500:
rows = int(instance.query("SELECT count() FROM test.data"))
if rows == num_rows:
break
attempt += 1
time.sleep(1)
assert rows == num_rows
dead_letters = []
def on_dead_letter(channel, method, properties, body):
dead_letters.append(body)
if len(dead_letters) == num_rows / 2:
channel.stop_consuming()
channel.basic_consume(deadletter_queue, on_dead_letter)
channel.start_consuming()
assert len(dead_letters) == num_rows / 2
i = 1
for letter in dead_letters:
assert f"Broken message {i}" in str(letter)
i += 2
connection.close()