From f9945494d9d000d81edc972cdb911ed299dd3b7a Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Fri, 19 Jul 2019 18:01:34 +0300 Subject: [PATCH] Always resume consumer before subscription. Also add more logs to see the difference between rd_kafka_assignment() vs rd_kafka_subscription() --- .../Kafka/ReadBufferFromKafkaConsumer.cpp | 16 ++++++++++++++++ .../Storages/Kafka/ReadBufferFromKafkaConsumer.h | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index fc81e38bb63..5c7a8222a69 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -55,6 +55,22 @@ void ReadBufferFromKafkaConsumer::commit() void ReadBufferFromKafkaConsumer::subscribe(const Names & topics) { + { + String message = "Subscribed to topics:"; + for (const auto & topic : consumer->get_subscription()) + message += " " + topic; + LOG_TRACE(log, message); + } + + { + String message = "Assigned to topics:"; + for (const auto & toppar : consumer->get_assignment()) + message += " " + toppar.get_topic(); + LOG_TRACE(log, message); + } + + consumer->resume(); + // While we wait for an assignment after subscribtion, we'll poll zero messages anyway. // If we're doing a manual select then it's better to get something after a wait, then immediate nothing. if (consumer->get_subscription().empty()) diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index ac6011cfed0..2400357d3c3 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -24,7 +24,7 @@ public: void subscribe(const Names & topics); // Subscribe internal consumer to topics. void unsubscribe(); // Unsubscribe internal consumer in case of failure. - auto pollTimeout() { return poll_timeout; } + auto pollTimeout() const { return poll_timeout; } // Return values for the message that's being read. String currentTopic() const { return current[-1].get_topic(); }