fix kafka

This commit is contained in:
Anton Popov 2021-09-09 17:48:01 +03:00
parent 8e5b3b2f6c
commit abebce2f18
2 changed files with 1 additions and 2 deletions

View File

@ -37,7 +37,6 @@ size_t StreamingFormatExecutor::execute()
try
{
size_t new_rows = 0;
port.setNeeded();
while (true)
{

View File

@ -88,7 +88,6 @@ Block KafkaBlockInputStream::readImpl()
std::optional<std::string> exception_message;
size_t total_rows = 0;
size_t new_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
@ -122,6 +121,7 @@ Block KafkaBlockInputStream::readImpl()
while (true)
{
size_t new_rows = 0;
exception_message.reset();
if (buffer->poll())
new_rows = executor.execute();