Fix dropping message to early when stalling

This commit is contained in:
Ivan Lezhankin 2019-06-21 17:29:10 +03:00
parent ec78ec8365
commit b75db2ef04

View File

@ -74,18 +74,21 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
{
if (intermediate_commit)
commit();
messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
if (new_messages.empty())
{
LOG_TRACE(log, "Stalled");
stalled = true;
return false;
}
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(log, "Polled batch of " << messages.size() << " messages");
}
if (messages.empty())
{
stalled = true;
return false;
}
if (auto err = current->get_error())
{
++current;