Unsubscribe kafka consumer before cleaning it by TTL

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-12-29 14:03:53 +01:00
parent f578541ded
commit b3d6caf37f
2 changed files with 18 additions and 1 deletions

View File

@ -151,6 +151,23 @@ void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
}); });
} }
ConsumerPtr && KafkaConsumer::moveConsumer()
{
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: {}", e.what());
}
drain();
}
return std::move(consumer);
}
KafkaConsumer::~KafkaConsumer() KafkaConsumer::~KafkaConsumer()
{ {
if (!consumer) if (!consumer)

View File

@ -74,7 +74,7 @@ public:
void createConsumer(cppkafka::Configuration consumer_config); void createConsumer(cppkafka::Configuration consumer_config);
bool hasConsumer() const { return consumer.get() != nullptr; } bool hasConsumer() const { return consumer.get() != nullptr; }
ConsumerPtr && moveConsumer() { return std::move(consumer); } ConsumerPtr && moveConsumer();
void commit(); // Commit all processed messages. void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics. void subscribe(); // Subscribe internal consumer to topics.