Fix kafka storage does not work with arrow and arrowstream format messages

This commit is contained in:
Chao Ma 2021-04-21 13:47:08 +08:00
parent b67f40bae1
commit 713eb9486c
2 changed files with 22 additions and 45 deletions

View File

@ -24,7 +24,6 @@ namespace ErrorCodes
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_)
: IInputFormat(header_, in_), stream{stream_}
{
prepareReader();
}
Chunk ArrowBlockInputFormat::generate()
@ -35,12 +34,18 @@ Chunk ArrowBlockInputFormat::generate()
if (stream)
{
if (!stream_reader)
prepareReader();
batch_result = stream_reader->Next();
if (batch_result.ok() && !(*batch_result))
return res;
}
else
{
if (!file_reader)
prepareReader();
if (record_batch_current >= record_batch_total)
return res;
@ -71,7 +76,7 @@ void ArrowBlockInputFormat::resetParser()
stream_reader.reset();
else
file_reader.reset();
prepareReader();
record_batch_current = 0;
}
void ArrowBlockInputFormat::prepareReader()

File diff suppressed because one or more lines are too long