diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index 5511f3c4cec..a67a0aeb519 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -74,18 +74,21 @@ bool ReadBufferFromKafkaConsumer::nextImpl() { if (intermediate_commit) commit(); - messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout)); + + /// Don't drop old messages immediately, since we may need them for virtual columns. + auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout)); + if (new_messages.empty()) + { + LOG_TRACE(log, "Stalled"); + stalled = true; + return false; + } + messages = std::move(new_messages); current = messages.begin(); LOG_TRACE(log, "Polled batch of " << messages.size() << " messages"); } - if (messages.empty()) - { - stalled = true; - return false; - } - if (auto err = current->get_error()) { ++current;