From b3d6caf37f3183da4598f683d2c3b5837e915495 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 29 Dec 2023 14:03:53 +0100 Subject: [PATCH] Unsubscribe kafka consumer before cleaning it by TTL Signed-off-by: Azat Khuzhin --- src/Storages/Kafka/KafkaConsumer.cpp | 17 +++++++++++++++++ src/Storages/Kafka/KafkaConsumer.h | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index ef007a26e4d..6d576d78668 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -151,6 +151,23 @@ void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config) }); } +ConsumerPtr && KafkaConsumer::moveConsumer() +{ + if (!consumer->get_subscription().empty()) + { + try + { + consumer->unsubscribe(); + } + catch (const cppkafka::HandleException & e) + { + LOG_ERROR(log, "Error during unsubscribe: {}", e.what()); + } + drain(); + } + return std::move(consumer); +} + KafkaConsumer::~KafkaConsumer() { if (!consumer) diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index 04a38caa5ab..c4dfc56312f 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -74,7 +74,7 @@ public: void createConsumer(cppkafka::Configuration consumer_config); bool hasConsumer() const { return consumer.get() != nullptr; } - ConsumerPtr && moveConsumer() { return std::move(consumer); } + ConsumerPtr && moveConsumer(); void commit(); // Commit all processed messages. void subscribe(); // Subscribe internal consumer to topics.