Freeze the Kafka buffer after first empty response (#5283)

* Check inside inferior streams for cancellation while reading.
* Stop reading from Kafka buffer after first empty read.
This commit is contained in:
Ivan 2019-05-15 19:11:50 +03:00 committed by GitHub
parent 98359a6a09
commit 1ea9e3019d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 1 deletions

View File

@ -63,7 +63,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
if (rows_portion_size && batch == rows_portion_size) if (rows_portion_size && batch == rows_portion_size)
{ {
batch = 0; batch = 0;
if (!checkTimeLimit()) if (!checkTimeLimit() || isCancelled())
break; break;
} }

View File

@ -27,6 +27,8 @@ void ReadBufferFromKafkaConsumer::subscribe(const Names & topics)
consumer->poll(5s); consumer->poll(5s);
consumer->resume(); consumer->resume();
} }
stalled = false;
} }
void ReadBufferFromKafkaConsumer::unsubscribe() void ReadBufferFromKafkaConsumer::unsubscribe()
@ -38,6 +40,12 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
/// Do commit messages implicitly after we processed the previous batch. /// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl() bool ReadBufferFromKafkaConsumer::nextImpl()
{ {
/// NOTE: ReadBuffer was implemented with a immutable buffer contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled)
return false;
if (current == messages.end()) if (current == messages.end())
{ {
commit(); commit();
@ -48,7 +56,10 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
} }
if (messages.empty() || current == messages.end()) if (messages.empty() || current == messages.end())
{
stalled = true;
return false; return false;
}
if (auto err = current->get_error()) if (auto err = current->get_error())
{ {

View File

@ -38,6 +38,7 @@ private:
Poco::Logger * log; Poco::Logger * log;
const size_t batch_size = 1; const size_t batch_size = 1;
const size_t poll_timeout = 0; const size_t poll_timeout = 0;
bool stalled = false;
Messages messages; Messages messages;
Messages::const_iterator current; Messages::const_iterator current;