Always resume consumer before subscription.

Also add more logs to see the difference between rd_kafka_assignment() vs rd_kafka_subscription()
This commit is contained in:
Ivan Lezhankin 2019-07-19 18:01:34 +03:00
parent d0f65bd5b0
commit f9945494d9
2 changed files with 17 additions and 1 deletions

View File

@ -55,6 +55,22 @@ void ReadBufferFromKafkaConsumer::commit()
void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
{
{
String message = "Subscribed to topics:";
for (const auto & topic : consumer->get_subscription())
message += " " + topic;
LOG_TRACE(log, message);
}
{
String message = "Assigned to topics:";
for (const auto & toppar : consumer->get_assignment())
message += " " + toppar.get_topic();
LOG_TRACE(log, message);
}
consumer->resume();
// While we wait for an assignment after subscribtion, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (consumer->get_subscription().empty())

View File

@ -24,7 +24,7 @@ public:
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
auto pollTimeout() { return poll_timeout; }
auto pollTimeout() const { return poll_timeout; }
// Return values for the message that's being read.
String currentTopic() const { return current[-1].get_topic(); }