diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 9f19bd464ff..4da5de69085 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -135,7 +135,8 @@ Block KafkaBlockInputStream::readImpl() auto new_rows = read_kafka_message(); // we can't store the offser after rebalance, when consumer is stalled, or if it's terminating - if (!buffer->storeLastReadMessageOffset()) { + if (!buffer->storeLastReadMessageOffset()) + { total_rows = 0; break; }