This commit is contained in:
kssenii 2024-02-08 17:40:41 +01:00
parent 57ddf196cf
commit e55b60e05c
6 changed files with 77 additions and 16 deletions

View File

@ -128,6 +128,32 @@ bool RabbitMQConsumer::ackMessages(const CommitInfo & commit_info)
return false;
}
bool RabbitMQConsumer::nackMessages(const CommitInfo & commit_info)
{
if (state != State::OK)
return false;
/// Nothing to nack.
if (!commit_info.delivery_tag || commit_info.delivery_tag <= last_commited_delivery_tag)
return false;
if (consumer_channel->reject(commit_info.delivery_tag, AMQP::multiple))
{
LOG_TRACE(
log, "Consumer rejected messages with deliveryTags from {} to {} on channel {}",
last_commited_delivery_tag, commit_info.delivery_tag, channel_id);
return true;
}
LOG_ERROR(
log,
"Failed to reject messages for {}:{}, (current commit point {}:{})",
commit_info.channel_id, commit_info.delivery_tag,
channel_id, last_commited_delivery_tag);
return false;
}
void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
{
@ -161,7 +187,7 @@ void RabbitMQConsumer::updateChannel(RabbitMQConnection & connection)
consumer_channel->onError([&](const char * message)
{
LOG_ERROR(log, "Channel {} in an error state: {}", channel_id, message);
LOG_ERROR(log, "Channel {} in in error state: {}", channel_id, message);
state = State::ERROR;
});
}

View File

@ -50,7 +50,9 @@ public:
UInt64 delivery_tag = 0;
String channel_id;
};
const MessageData & currentMessage() { return current; }
const String & getChannelID() const { return channel_id; }
/// Return read buffer containing next available message
/// or nullptr if there are no messages to process.
@ -63,6 +65,7 @@ public:
bool isConsumerStopped() const { return stopped.load(); }
bool ackMessages(const CommitInfo & commit_info);
bool nackMessages(const CommitInfo & commit_info);
bool hasPendingMessages() { return !received.empty(); }

View File

@ -120,10 +120,20 @@ Chunk RabbitMQSource::generateImpl()
{
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
consumer = storage.popConsumer(timeout);
if (consumer->needChannelUpdate())
{
LOG_TRACE(log, "Channel {} is in error state, will update", consumer->getChannelID());
consumer->updateChannel(storage.getConnection());
}
}
if (is_finished || !consumer || consumer->isConsumerStopped())
{
LOG_TRACE(log, "RabbitMQSource is stopped (is_finished: {}, consumer_stopped: {})",
is_finished, consumer ? toString(consumer->isConsumerStopped()) : "No consumer");
return {};
}
/// Currently it is one time usage source: to make sure data is flushed
/// strictly by timeout or by block size.
@ -254,13 +264,12 @@ Chunk RabbitMQSource::generateImpl()
bool RabbitMQSource::sendAck()
{
if (!consumer)
return false;
return consumer && consumer->ackMessages(commit_info);
}
if (!consumer->ackMessages(commit_info))
return false;
return true;
bool RabbitMQSource::sendNack()
{
return consumer && consumer->nackMessages(commit_info);
}
}

View File

@ -33,6 +33,7 @@ public:
bool needChannelUpdate();
void updateChannel();
bool sendAck();
bool sendNack();
private:
StorageRabbitMQ & storage;

View File

@ -1061,7 +1061,8 @@ bool StorageRabbitMQ::tryStreamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false);
*this, storage_snapshot, rabbitmq_context, column_names, block_size,
max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false);
sources.emplace_back(source);
pipes.emplace_back(source);
@ -1069,13 +1070,25 @@ bool StorageRabbitMQ::tryStreamToViews()
block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
if (!connection->getHandler().loopRunning())
startLoop();
bool write_failed = false;
try
{
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
write_failed = true;
}
LOG_TRACE(log, "Processed {} rows", rows);
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent
@ -1083,13 +1096,6 @@ bool StorageRabbitMQ::tryStreamToViews()
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;
if (!hasDependencies(getStorageID()))
{
/// Do not commit to rabbitmq if the dependency was removed.
LOG_TRACE(log, "No dependencies, reschedule");
return false;
}
if (!connection->isConnected())
{
if (shutdown_called)
@ -1130,7 +1136,7 @@ bool StorageRabbitMQ::tryStreamToViews()
* the same channel will also commit all previously not-committed messages. Anyway I do not think that for ack frame this
* will ever happen.
*/
if (!source->sendAck())
if (write_failed ? source->sendNack() : source->sendAck())
{
/// Iterate loop to activate error callbacks if they happened
connection->getHandler().iterateLoop();
@ -1142,6 +1148,19 @@ bool StorageRabbitMQ::tryStreamToViews()
}
}
if (write_failed)
{
LOG_TRACE(log, "Write failed, reschedule");
return false;
}
if (!hasDependencies(getStorageID()))
{
/// Do not commit to rabbitmq if the dependency was removed.
LOG_TRACE(log, "No dependencies, reschedule");
return false;
}
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
{
connection->heartbeat();

View File

@ -3549,3 +3549,6 @@ def test_attach_broken_table(rabbitmq_cluster):
assert "CANNOT_CONNECT_RABBITMQ" in error
error = instance.query_and_get_error("INSERT INTO rabbit_queue VALUES ('test')")
assert "CANNOT_CONNECT_RABBITMQ" in error
# TODO: add a test